# -*- coding: utf-8 -*- # This file is part of the Rocket Web Server # Copyright (c) 2011 Timothy Farrell # MIT License # Import System Modules import sys import errno import socket import logging import platform import traceback # Define Constants VERSION = '1.2.4' SERVER_NAME = socket.gethostname() SERVER_SOFTWARE = 'Rocket %s' % VERSION HTTP_SERVER_SOFTWARE = '%s Python/%s' % (SERVER_SOFTWARE, sys.version.split(' ')[0]) BUF_SIZE = 16384 SOCKET_TIMEOUT = 1 # in secs THREAD_STOP_CHECK_INTERVAL = 1 # in secs, How often should threads check for a server stop message? IS_JYTHON = platform.system() == 'Java' # Handle special cases for Jython IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET]) DEFAULT_LISTEN_QUEUE_SIZE = 5 DEFAULT_MIN_THREADS = 10 DEFAULT_MAX_THREADS = 0 DEFAULTS = dict(LISTEN_QUEUE_SIZE = DEFAULT_LISTEN_QUEUE_SIZE, MIN_THREADS = DEFAULT_MIN_THREADS, MAX_THREADS = DEFAULT_MAX_THREADS) PY3K = sys.version_info[0] > 2 class NullHandler(logging.Handler): "A Logging handler to prevent library errors." def emit(self, record): pass if PY3K: def b(val): """ Convert string/unicode/bytes literals into bytes. This allows for the same code to run on Python 2.x and 3.x. """ if isinstance(val, str): return val.encode() else: return val def u(val, encoding="us-ascii"): """ Convert bytes into string/unicode. This allows for the same code to run on Python 2.x and 3.x. """ if isinstance(val, bytes): return val.decode(encoding) else: return val else: def b(val): """ Convert string/unicode/bytes literals into bytes. This allows for the same code to run on Python 2.x and 3.x. """ if isinstance(val, unicode): return val.encode() else: return val def u(val, encoding="us-ascii"): """ Convert bytes into string/unicode. This allows for the same code to run on Python 2.x and 3.x. """ if isinstance(val, str): return val.decode(encoding) else: return val # Import Package Modules # package imports removed in monolithic build __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE', 'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u', 'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler'] # Monolithic build...end of module: rocket\__init__.py # Monolithic build...start of module: rocket\connection.py # Import System Modules import sys import time import socket try: import ssl has_ssl = True except ImportError: has_ssl = False # Import Package Modules # package imports removed in monolithic build # TODO - This part is still very experimental. #from .filelike import FileLikeSocket class Connection(object): __slots__ = [ 'setblocking', 'sendall', 'shutdown', 'makefile', 'fileno', 'client_addr', 'client_port', 'server_port', 'socket', 'start_time', 'ssl', 'secure', 'recv', 'send', 'read', 'write' ] def __init__(self, sock_tuple, port, secure=False): self.client_addr, self.client_port = sock_tuple[1] self.server_port = port self.socket = sock_tuple[0] self.start_time = time.time() self.ssl = has_ssl and isinstance(self.socket, ssl.SSLSocket) self.secure = secure if IS_JYTHON: # In Jython we must set TCP_NODELAY here since it does not # inherit from the listening socket. # See: http://bugs.jython.org/issue1309 self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.socket.settimeout(SOCKET_TIMEOUT) self.sendall = self.socket.sendall self.shutdown = self.socket.shutdown self.fileno = self.socket.fileno self.setblocking = self.socket.setblocking self.recv = self.socket.recv self.send = self.socket.send self.makefile = self.socket.makefile # FIXME - this is not ready for prime-time yet. # def makefile(self, buf_size=BUF_SIZE): # return FileLikeSocket(self, buf_size) def close(self): if hasattr(self.socket, '_sock'): try: self.socket._sock.close() except socket.error: info = sys.exc_info() if info[1].args[0] != socket.EBADF: raise info[1] else: pass self.socket.close() # Monolithic build...end of module: rocket\connection.py # Monolithic build...start of module: rocket\filelike.py # Import System Modules import socket try: from io import StringIO except ImportError: try: from cStringIO import StringIO except ImportError: from StringIO import StringIO # Import Package Modules # package imports removed in monolithic build class FileLikeSocket(object): def __init__(self, conn, buf_size=BUF_SIZE): self.conn = conn self.buf_size = buf_size self.buffer = StringIO() self.content_length = None if self.conn.socket.gettimeout() == 0.0: self.read = self.non_blocking_read else: self.read = self.blocking_read def __iter__(self): return self def recv(self, size): while True: try: return self.conn.recv(size) except socket.error: exc = sys.exc_info() e = exc[1] # FIXME - Don't raise socket_errors_nonblocking or socket_error_eintr if (e.args[0] not in set()): raise def next(self): data = self.readline() if data == '': raise StopIteration return data def non_blocking_read(self, size=None): # Shamelessly adapted from Cherrypy! bufr = self.buffer bufr.seek(0, 2) if size is None: while True: data = self.recv(self.buf_size) if not data: break bufr.write(data) self.buffer = StringIO() return bufr.getvalue() else: buf_len = self.buffer.tell() if buf_len >= size: bufr.seek(0) data = bufr.read(size) self.buffer = StringIO(bufr.read()) return data self.buffer = StringIO() while True: remaining = size - buf_len data = self.recv(remaining) if not data: break n = len(data) if n == size and not buf_len: return data if n == remaining: bufr.write(data) del data break bufr.write(data) buf_len += n del data return bufr.getvalue() def blocking_read(self, length=None): if length is None: if self.content_length is not None: length = self.content_length else: length = 1 try: data = self.conn.recv(length) except: data = b('') return data def readline(self): data = b("") char = self.read(1) while char != b('\n') and char is not b(''): line = repr(char) data += char char = self.read(1) data += char return data def readlines(self, hint="ignored"): return list(self) def close(self): self.conn = None self.content_length = None # Monolithic build...end of module: rocket\filelike.py # Monolithic build...start of module: rocket\futures.py # Import System Modules import time try: from concurrent.futures import Future, ThreadPoolExecutor from concurrent.futures.thread import _WorkItem has_futures = True except ImportError: has_futures = False class Future: pass class ThreadPoolExecutor: pass class _WorkItem: pass class WSGIFuture(Future): def __init__(self, f_dict, *args, **kwargs): Future.__init__(self, *args, **kwargs) self.timeout = None self._mem_dict = f_dict self._lifespan = 30 self._name = None self._start_time = time.time() def set_running_or_notify_cancel(self): if time.time() - self._start_time >= self._lifespan: self.cancel() else: return super(WSGIFuture, self).set_running_or_notify_cancel() def remember(self, name, lifespan=None): self._lifespan = lifespan or self._lifespan if name in self._mem_dict: raise NameError('Cannot remember future by name "%s". ' % name + \ 'A future already exists with that name.' ) self._name = name self._mem_dict[name] = self return self def forget(self): if self._name in self._mem_dict and self._mem_dict[self._name] is self: del self._mem_dict[self._name] self._name = None class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future self.fn = fn self.args = args self.kwargs = kwargs def run(self): if not self.future.set_running_or_notify_cancel(): return try: result = self.fn(*self.args, **self.kwargs) except BaseException: e = sys.exc_info()[1] self.future.set_exception(e) else: self.future.set_result(result) class WSGIExecutor(ThreadPoolExecutor): multithread = True multiprocess = False def __init__(self, *args, **kwargs): ThreadPoolExecutor.__init__(self, *args, **kwargs) self.futures = dict() def submit(self, fn, *args, **kwargs): if self._shutdown_lock.acquire(): if self._shutdown: self._shutdown_lock.release() raise RuntimeError('Cannot schedule new futures after shutdown') f = WSGIFuture(self.futures) w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() self._shutdown_lock.release() return f else: return False class FuturesMiddleware(object): "Futures middleware that adds a Futures Executor to the environment" def __init__(self, app, threads=5): self.app = app self.executor = WSGIExecutor(threads) def __call__(self, environ, start_response): environ["wsgiorg.executor"] = self.executor environ["wsgiorg.futures"] = self.executor.futures return self.app(environ, start_response) # Monolithic build...end of module: rocket\futures.py # Monolithic build...start of module: rocket\listener.py # Import System Modules import os import socket import logging import traceback from threading import Thread try: import ssl from ssl import SSLError has_ssl = True except ImportError: has_ssl = False class SSLError(socket.error): pass # Import Package Modules # package imports removed in monolithic build class Listener(Thread): """The Listener class is a class responsible for accepting connections and queuing them to be processed by a worker thread.""" def __init__(self, interface, queue_size, active_queue, *args, **kwargs): Thread.__init__(self, *args, **kwargs) # Instance variables self.active_queue = active_queue self.interface = interface self.addr = interface[0] self.port = interface[1] self.secure = len(interface) >= 4 self.clientcert_req = (len(interface) == 5 and interface[4]) self.thread = None self.ready = False # Error Log self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port) self.err_log.addHandler(NullHandler()) # Build the socket listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if not listener: self.err_log.error("Failed to get socket.") return if self.secure: if not has_ssl: self.err_log.error("ssl module required to serve HTTPS.") return elif not os.path.exists(interface[2]): data = (interface[2], interface[0], interface[1]) self.err_log.error("Cannot find key file " "'%s'. Cannot bind to %s:%s" % data) return elif not os.path.exists(interface[3]): data = (interface[3], interface[0], interface[1]) self.err_log.error("Cannot find certificate file " "'%s'. Cannot bind to %s:%s" % data) return if self.clientcert_req and not os.path.exists(interface[4]): data = (interface[4], interface[0], interface[1]) self.err_log.error("Cannot find root ca certificate file " "'%s'. Cannot bind to %s:%s" % data) return # Set socket options try: listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except: msg = "Cannot share socket. Using %s:%i exclusively." self.err_log.warning(msg % (self.addr, self.port)) try: if not IS_JYTHON: listener.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) except: msg = "Cannot set TCP_NODELAY, things might run a little slower" self.err_log.warning(msg) try: listener.bind((self.addr, self.port)) except: msg = "Socket %s:%i in use by other process and it won't share." self.err_log.error(msg % (self.addr, self.port)) else: # We want socket operations to timeout periodically so we can # check if the server is shutting down listener.settimeout(THREAD_STOP_CHECK_INTERVAL) # Listen for new connections allowing queue_size number of # connections to wait before rejecting a connection. listener.listen(queue_size) self.listener = listener self.ready = True def wrap_socket(self, sock): try: if self.clientcert_req: ca_certs = self.interface[4] cert_reqs = ssl.CERT_OPTIONAL sock = ssl.wrap_socket(sock, keyfile = self.interface[2], certfile = self.interface[3], server_side = True, cert_reqs = cert_reqs, ca_certs = ca_certs, ssl_version = ssl.PROTOCOL_SSLv23) else: sock = ssl.wrap_socket(sock, keyfile = self.interface[2], certfile = self.interface[3], server_side = True, ssl_version = ssl.PROTOCOL_SSLv23) except SSLError: # Generally this happens when an HTTP request is received on a # secure socket. We don't do anything because it will be detected # by Worker and dealt with appropriately. # self.err_log.error('SSL Error: %s' % traceback.format_exc()) pass return sock def start(self): if not self.ready: self.err_log.warning('Listener started when not ready.') return if self.thread is not None and self.thread.isAlive(): self.err_log.warning('Listener already running.') return self.thread = Thread(target=self.listen, name="Port" + str(self.port)) self.thread.start() def isAlive(self): if self.thread is None: return False return self.thread.isAlive() def join(self): if self.thread is None: return self.ready = False self.thread.join() del self.thread self.thread = None self.ready = True def listen(self): if __debug__: self.err_log.debug('Entering main loop.') while True: try: sock, addr = self.listener.accept() if self.secure: sock = self.wrap_socket(sock) self.active_queue.put(((sock, addr), self.interface[1], self.secure)) except socket.timeout: # socket.timeout will be raised every THREAD_STOP_CHECK_INTERVAL # seconds. When that happens, we check if it's time to die. if not self.ready: if __debug__: self.err_log.debug('Listener exiting.') return else: continue except: self.err_log.error(traceback.format_exc()) # Monolithic build...end of module: rocket\listener.py # Monolithic build...start of module: rocket\main.py # Import System Modules import sys import time import socket import logging import traceback from threading import Lock try: from queue import Queue except ImportError: from Queue import Queue # Import Package Modules # package imports removed in monolithic build # Setup Logging log = logging.getLogger('Rocket') log.addHandler(NullHandler()) class Rocket(object): """The Rocket class is responsible for handling threads and accepting and dispatching connections.""" def __init__(self, interfaces = ('127.0.0.1', 8000), method = 'wsgi', app_info = None, min_threads = None, max_threads = None, queue_size = None, timeout = 600, handle_signals = True): self.handle_signals = handle_signals self.startstop_lock = Lock() self.timeout = timeout if not isinstance(interfaces, list): self.interfaces = [interfaces] else: self.interfaces = interfaces if min_threads is None: min_threads = DEFAULTS['MIN_THREADS'] if max_threads is None: max_threads = DEFAULTS['MAX_THREADS'] if not queue_size: if hasattr(socket, 'SOMAXCONN'): queue_size = socket.SOMAXCONN else: queue_size = DEFAULTS['LISTEN_QUEUE_SIZE'] if max_threads and queue_size > max_threads: queue_size = max_threads if isinstance(app_info, dict): app_info['server_software'] = SERVER_SOFTWARE self.monitor_queue = Queue() self.active_queue = Queue() self._threadpool = ThreadPool(get_method(method), app_info = app_info, active_queue = self.active_queue, monitor_queue = self.monitor_queue, min_threads = min_threads, max_threads = max_threads) # Build our socket listeners self.listeners = [Listener(i, queue_size, self.active_queue) for i in self.interfaces] for ndx in range(len(self.listeners)-1, 0, -1): if not self.listeners[ndx].ready: del self.listeners[ndx] if not self.listeners: log.critical("No interfaces to listen on...closing.") sys.exit(1) def _sigterm(self, signum, frame): log.info('Received SIGTERM') self.stop() def _sighup(self, signum, frame): log.info('Received SIGHUP') self.restart() def start(self, background=False): log.info('Starting %s' % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Set up our shutdown signals if self.handle_signals: try: import signal signal.signal(signal.SIGTERM, self._sigterm) signal.signal(signal.SIGUSR1, self._sighup) except: log.debug('This platform does not support signals.') # Start our worker threads self._threadpool.start() # Start our monitor thread self._monitor = Monitor(self.monitor_queue, self.active_queue, self.timeout, self._threadpool) self._monitor.setDaemon(True) self._monitor.start() # I know that EXPR and A or B is bad but I'm keeping it for Py2.4 # compatibility. str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') msg = 'Listening on sockets: ' msg += ', '.join(['%s:%i%s' % str_extract(l) for l in self.listeners]) log.info(msg) for l in self.listeners: l.start() finally: self.startstop_lock.release() if background: return while self._monitor.isAlive(): try: time.sleep(THREAD_STOP_CHECK_INTERVAL) except KeyboardInterrupt: # Capture a keyboard interrupt when running from a console break except: if self._monitor.isAlive(): log.error(traceback.format_exc()) continue return self.stop() def stop(self, stoplogging = False): log.info('Stopping %s' % SERVER_SOFTWARE) self.startstop_lock.acquire() try: # Stop listeners for l in self.listeners: l.ready = False # Encourage a context switch time.sleep(0.01) for l in self.listeners: if l.isAlive(): l.join() # Stop Monitor self._monitor.stop() if self._monitor.isAlive(): self._monitor.join() # Stop Worker threads self._threadpool.stop() if stoplogging: logging.shutdown() msg = "Calling logging.shutdown() is now the responsibility of \ the application developer. Please update your \ applications to no longer call rocket.stop(True)" try: import warnings raise warnings.DeprecationWarning(msg) except ImportError: raise RuntimeError(msg) finally: self.startstop_lock.release() def restart(self): self.stop() self.start() def CherryPyWSGIServer(bind_addr, wsgi_app, numthreads = 10, server_name = None, max = -1, request_queue_size = 5, timeout = 10, shutdown_timeout = 5): """ A Cherrypy wsgiserver-compatible wrapper. """ max_threads = max if max_threads < 0: max_threads = 0 return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app}, min_threads = numthreads, max_threads = max_threads, queue_size = request_queue_size, timeout = timeout) # Monolithic build...end of module: rocket\main.py # Monolithic build...start of module: rocket\monitor.py # Import System Modules import time import logging import select from threading import Thread # Import Package Modules # package imports removed in monolithic build class Monitor(Thread): # Monitor worker class. def __init__(self, monitor_queue, active_queue, timeout, threadpool, *args, **kwargs): Thread.__init__(self, *args, **kwargs) self._threadpool = threadpool # Instance Variables self.monitor_queue = monitor_queue self.active_queue = active_queue self.timeout = timeout self.log = logging.getLogger('Rocket.Monitor') self.log.addHandler(NullHandler()) self.connections = set() self.active = False def run(self): self.active = True conn_list = list() list_changed = False # We need to make sure the queue is empty before we start while not self.monitor_queue.empty(): self.monitor_queue.get() if __debug__: self.log.debug('Entering monitor loop.') # Enter thread main loop while self.active: # Move the queued connections to the selection pool while not self.monitor_queue.empty(): if __debug__: self.log.debug('In "receive timed-out connections" loop.') c = self.monitor_queue.get() if c is None: # A non-client is a signal to die if __debug__: self.log.debug('Received a death threat.') self.stop() break self.log.debug('Received a timed out connection.') if __debug__: assert(c not in self.connections) if IS_JYTHON: # Jython requires a socket to be in Non-blocking mode in # order to select on it. c.setblocking(False) if __debug__: self.log.debug('Adding connection to monitor list.') self.connections.add(c) list_changed = True # Wait on those connections if list_changed: conn_list = list(self.connections) list_changed = False try: if len(conn_list): readable = select.select(conn_list, [], [], THREAD_STOP_CHECK_INTERVAL)[0] else: time.sleep(THREAD_STOP_CHECK_INTERVAL) readable = [] if not self.active: break # If we have any readable connections, put them back for r in readable: if __debug__: self.log.debug('Restoring readable connection') if IS_JYTHON: # Jython requires a socket to be in Non-blocking mode in # order to select on it, but the rest of the code requires # that it be in blocking mode. r.setblocking(True) r.start_time = time.time() self.active_queue.put(r) self.connections.remove(r) list_changed = True except: if self.active: raise else: break # If we have any stale connections, kill them off. if self.timeout: now = time.time() stale = set() for c in self.connections: if (now - c.start_time) >= self.timeout: stale.add(c) for c in stale: if __debug__: # "EXPR and A or B" kept for Py2.4 compatibility data = (c.client_addr, c.server_port, c.ssl and '*' or '') self.log.debug('Flushing stale connection: %s:%i%s' % data) self.connections.remove(c) list_changed = True try: c.close() finally: del c # Dynamically resize the threadpool to adapt to our changing needs. self._threadpool.dynamic_resize() def stop(self): self.active = False if __debug__: self.log.debug('Flushing waiting connections') while self.connections: c = self.connections.pop() try: c.close() finally: del c if __debug__: self.log.debug('Flushing queued connections') while not self.monitor_queue.empty(): c = self.monitor_queue.get() if c is None: continue try: c.close() finally: del c # Place a None sentry value to cause the monitor to die. self.monitor_queue.put(None) # Monolithic build...end of module: rocket\monitor.py # Monolithic build...start of module: rocket\threadpool.py # Import System Modules import logging # Import Package Modules # package imports removed in monolithic build # Setup Logging log = logging.getLogger('Rocket.Errors.ThreadPool') log.addHandler(NullHandler()) class ThreadPool: """The ThreadPool class is a container class for all the worker threads. It manages the number of actively running threads.""" def __init__(self, method, app_info, active_queue, monitor_queue, min_threads=DEFAULTS['MIN_THREADS'], max_threads=DEFAULTS['MAX_THREADS'], ): if __debug__: log.debug("Initializing ThreadPool.") self.check_for_dead_threads = 0 self.active_queue = active_queue self.worker_class = method self.min_threads = min_threads self.max_threads = max_threads self.monitor_queue = monitor_queue self.stop_server = False self.alive = False # TODO - Optimize this based on some real-world usage data self.grow_threshold = int(max_threads/10) + 2 if not isinstance(app_info, dict): app_info = dict() if has_futures and app_info.get('futures'): app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'], 2])) app_info.update(max_threads=max_threads, min_threads=min_threads) self.min_threads = min_threads self.app_info = app_info self.threads = set() def start(self): self.stop_server = False if __debug__: log.debug("Starting threads.") self.grow(self.min_threads) self.alive = True def stop(self): self.alive = False if __debug__: log.debug("Stopping threads.") self.stop_server = True # Prompt the threads to die self.shrink(len(self.threads)) # Stop futures initially if has_futures and self.app_info.get('futures'): if __debug__: log.debug("Future executor is present. Python will not " "exit until all jobs have finished.") self.app_info['executor'].shutdown(wait=False) # Give them the gun #active_threads = [t for t in self.threads if t.isAlive()] #while active_threads: # t = active_threads.pop() # t.kill() # Wait until they pull the trigger for t in self.threads: if t.isAlive(): t.join() # Clean up the mess self.bring_out_your_dead() def bring_out_your_dead(self): # Remove dead threads from the pool dead_threads = [t for t in self.threads if not t.isAlive()] for t in dead_threads: if __debug__: log.debug("Removing dead thread: %s." % t.getName()) try: # Py2.4 complains here so we put it in a try block self.threads.remove(t) except: pass self.check_for_dead_threads -= len(dead_threads) def grow(self, amount=None): if self.stop_server: return if not amount: amount = self.max_threads if self.alive: amount = min([amount, self.max_threads - len(self.threads)]) if __debug__: log.debug("Growing by %i." % amount) for x in range(amount): worker = self.worker_class(self.app_info, self.active_queue, self.monitor_queue) worker.setDaemon(True) self.threads.add(worker) worker.start() def shrink(self, amount=1): if __debug__: log.debug("Shrinking by %i." % amount) self.check_for_dead_threads += amount for x in range(amount): self.active_queue.put(None) def dynamic_resize(self): if (self.max_threads > self.min_threads or self.max_threads == 0): if self.check_for_dead_threads > 0: self.bring_out_your_dead() queueSize = self.active_queue.qsize() threadCount = len(self.threads) if __debug__: log.debug("Examining ThreadPool. %i threads and %i Q'd conxions" % (threadCount, queueSize)) if queueSize == 0 and threadCount > self.min_threads: self.shrink() elif queueSize > self.grow_threshold: self.grow(queueSize) # Monolithic build...end of module: rocket\threadpool.py # Monolithic build...start of module: rocket\worker.py # Import System Modules import re import sys import socket import logging import traceback from wsgiref.headers import Headers from threading import Thread from datetime import datetime try: from urllib import unquote except ImportError: from urllib.parse import unquote try: from io import StringIO except ImportError: try: from cStringIO import StringIO except ImportError: from StringIO import StringIO try: from ssl import SSLError except ImportError: class SSLError(socket.error): pass # Import Package Modules # package imports removed in monolithic build # Define Constants re_SLASH = re.compile('%2F', re.IGNORECASE) re_REQUEST_LINE = re.compile(r"""^ (?POPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT) # Request Method \ # (single space) ( (?P[^:/]+) # Scheme (://) # (?P[^/]+) # Host )? # (?P(\*|/[^ \?]*)) # Path (\? (?P[^ ]+))? # Query String \ # (single space) (?PHTTPS?/1\.[01]) # Protocol $ """, re.X) LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s' RESPONSE = '''\ HTTP/1.1 %s Content-Length: %i Content-Type: %s %s ''' if IS_JYTHON: HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE', 'CONNECT']) class Worker(Thread): """The Worker class is a base class responsible for receiving connections and (a subclass) will run an application to process the the connection """ def __init__(self, app_info, active_queue, monitor_queue, *args, **kwargs): Thread.__init__(self, *args, **kwargs) # Instance Variables self.app_info = app_info self.active_queue = active_queue self.monitor_queue = monitor_queue self.size = 0 self.status = "200 OK" self.closeConnection = True self.request_line = "" # Request Log self.req_log = logging.getLogger('Rocket.Requests') self.req_log.addHandler(NullHandler()) # Error Log self.err_log = logging.getLogger('Rocket.Errors.'+self.getName()) self.err_log.addHandler(NullHandler()) def _handleError(self, typ, val, tb): if typ == SSLError: if 'timed out' in val.args[0]: typ = SocketTimeout if typ == SocketTimeout: if __debug__: self.err_log.debug('Socket timed out') self.monitor_queue.put(self.conn) return True if typ == SocketClosed: self.closeConnection = True if __debug__: self.err_log.debug('Client closed socket') return False if typ == BadRequest: self.closeConnection = True if __debug__: self.err_log.debug('Client sent a bad request') return True if typ == socket.error: self.closeConnection = True if val.args[0] in IGNORE_ERRORS_ON_CLOSE: if __debug__: self.err_log.debug('Ignorable socket Error received...' 'closing connection.') return False else: self.status = "999 Utter Server Failure" tb_fmt = traceback.format_exception(typ, val, tb) self.err_log.error('Unhandled Error when serving ' 'connection:\n' + '\n'.join(tb_fmt)) return False self.closeConnection = True tb_fmt = traceback.format_exception(typ, val, tb) self.err_log.error('\n'.join(tb_fmt)) self.send_response('500 Server Error') return False def run(self): if __debug__: self.err_log.debug('Entering main loop.') # Enter thread main loop while True: conn = self.active_queue.get() if not conn: # A non-client is a signal to die if __debug__: self.err_log.debug('Received a death threat.') return conn if isinstance(conn, tuple): conn = Connection(*conn) self.conn = conn if conn.ssl != conn.secure: self.err_log.info('Received HTTP connection on HTTPS port.') self.send_response('400 Bad Request') self.closeConnection = True conn.close() continue else: if __debug__: self.err_log.debug('Received a connection.') self.closeConnection = False # Enter connection serve loop while True: if __debug__: self.err_log.debug('Serving a request') try: self.run_app(conn) log_info = dict(client_ip = conn.client_addr, time = datetime.now().strftime('%c'), status = self.status.split(' ')[0], size = self.size, request_line = self.request_line) self.req_log.info(LOG_LINE % log_info) except: exc = sys.exc_info() handled = self._handleError(*exc) if handled: break else: if self.request_line: log_info = dict(client_ip = conn.client_addr, time = datetime.now().strftime('%c'), status = self.status.split(' ')[0], size = self.size, request_line = self.request_line + ' - not stopping') self.req_log.info(LOG_LINE % log_info) if self.closeConnection: try: conn.close() except: self.err_log.error(str(traceback.format_exc())) break def run_app(self, conn): # Must be overridden with a method reads the request from the socket # and sends a response. self.closeConnection = True raise NotImplementedError('Overload this method!') def send_response(self, status): stat_msg = status.split(' ', 1)[1] msg = RESPONSE % (status, len(stat_msg), 'text/plain', stat_msg) try: self.conn.sendall(b(msg)) except socket.error: self.closeConnection = True self.err_log.error('Tried to send "%s" to client but received socket' ' error' % status) #def kill(self): # if self.isAlive() and hasattr(self, 'conn'): # try: # self.conn.shutdown(socket.SHUT_RDWR) # except socket.error: # info = sys.exc_info() # if info[1].args[0] != socket.EBADF: # self.err_log.debug('Error on shutdown: '+str(info)) def read_request_line(self, sock_file): self.request_line = '' try: # Grab the request line d = sock_file.readline() if PY3K: d = d.decode('ISO-8859-1') if d == '\r\n': # Allow an extra NEWLINE at the beginning per HTTP 1.1 spec if __debug__: self.err_log.debug('Client sent newline') d = sock_file.readline() if PY3K: d = d.decode('ISO-8859-1') except socket.timeout: raise SocketTimeout("Socket timed out before request.") d = d.strip() if not d: if __debug__: self.err_log.debug('Client did not send a recognizable request.') raise SocketClosed('Client closed socket.') self.request_line = d # NOTE: I've replaced the traditional method of procedurally breaking # apart the request line with a (rather unsightly) regular expression. # However, Java's regexp support sucks so bad that it actually takes # longer in Jython to process the regexp than procedurally. So I've # left the old code here for Jython's sake...for now. if IS_JYTHON: return self._read_request_line_jython(d) match = re_REQUEST_LINE.match(d) if not match: self.send_response('400 Bad Request') raise BadRequest req = match.groupdict() for k,v in req.items(): if not v: req[k] = "" if k == 'path': req['path'] = r'%2F'.join([unquote(x) for x in re_SLASH.split(v)]) return req def _read_request_line_jython(self, d): d = d.strip() try: method, uri, proto = d.split(' ') if not proto.startswith('HTTP') or \ proto[-3:] not in ('1.0', '1.1') or \ method not in HTTP_METHODS: self.send_response('400 Bad Request') raise BadRequest except ValueError: self.send_response('400 Bad Request') raise BadRequest req = dict(method=method, protocol = proto) scheme = '' host = '' if uri == '*' or uri.startswith('/'): path = uri elif '://' in uri: scheme, rest = uri.split('://') host, path = rest.split('/', 1) path = '/' + path else: self.send_response('400 Bad Request') raise BadRequest query_string = '' if '?' in path: path, query_string = path.split('?', 1) path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)]) req.update(path=path, query_string=query_string, scheme=scheme.lower(), host=host) return req def read_headers(self, sock_file): try: headers = dict() l = sock_file.readline() lname = None lval = None while True: if PY3K: try: l = str(l, 'ISO-8859-1') except UnicodeDecodeError: self.err_log.warning('Client sent invalid header: ' + repr(l)) if l == '\r\n': break if l[0] in ' \t' and lname: # Some headers take more than one line lval += ',' + l.strip() else: # HTTP header values are latin-1 encoded l = l.split(':', 1) # HTTP header names are us-ascii encoded lname = l[0].strip().upper().replace('-', '_') lval = l[-1].strip() headers[str(lname)] = str(lval) l = sock_file.readline() except socket.timeout: raise SocketTimeout("Socket timed out before request.") return headers class SocketTimeout(Exception): "Exception for when a socket times out between requests." pass class BadRequest(Exception): "Exception for when a client sends an incomprehensible request." pass class SocketClosed(Exception): "Exception for when a socket is closed by the client." pass class ChunkedReader(object): def __init__(self, sock_file): self.stream = sock_file self.chunk_size = 0 def _read_header(self): chunk_len = "" try: while "" == chunk_len: chunk_len = self.stream.readline().strip() return int(chunk_len, 16) except ValueError: return 0 def read(self, size): data = b('') chunk_size = self.chunk_size while size: if not chunk_size: chunk_size = self._read_header() if size < chunk_size: data += self.stream.read(size) chunk_size -= size break else: if not chunk_size: break data += self.stream.read(chunk_size) size -= chunk_size chunk_size = 0 self.chunk_size = chunk_size return data def readline(self): data = b('') c = self.read(1) while c and c != b('\n'): data += c c = self.read(1) data += c return data def readlines(self): yield self.readline() def get_method(method): methods = dict(wsgi=WSGIWorker, fs=FileSystemWorker) return methods[method.lower()] # Monolithic build...end of module: rocket\worker.py # Monolithic build...start of module: rocket\methods\__init__.py # Monolithic build...end of module: rocket\methods\__init__.py # Monolithic build...start of module: rocket\methods\fs.py # Import System Modules import os import time import mimetypes from email.utils import formatdate from wsgiref.headers import Headers from wsgiref.util import FileWrapper # Import Package Modules # package imports removed in monolithic build # Define Constants CHUNK_SIZE = 2**16 # 64 Kilobyte chunks HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' INDEX_HEADER = '''\ Directory Index: %(path)s

Directory Index: %(path)s

''' INDEX_ROW = '''''' INDEX_FOOTER = '''
Directories
\r\n''' class LimitingFileWrapper(FileWrapper): def __init__(self, limit=None, *args, **kwargs): self.limit = limit FileWrapper.__init__(self, *args, **kwargs) def read(self, amt): if amt > self.limit: amt = self.limit self.limit -= amt return FileWrapper.read(self, amt) class FileSystemWorker(Worker): def __init__(self, *args, **kwargs): """Builds some instance variables that will last the life of the thread.""" Worker.__init__(self, *args, **kwargs) self.root = os.path.abspath(self.app_info['document_root']) self.display_index = self.app_info['display_index'] def serve_file(self, filepath, headers): filestat = os.stat(filepath) self.size = filestat.st_size modtime = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(filestat.st_mtime)) self.headers.add_header('Last-Modified', modtime) if headers.get('if_modified_since') == modtime: # The browser cache is up-to-date, send a 304. self.status = "304 Not Modified" self.data = [] return ct = mimetypes.guess_type(filepath)[0] self.content_type = ct if ct else 'text/plain' try: f = open(filepath, 'rb') self.headers['Pragma'] = 'cache' self.headers['Cache-Control'] = 'private' self.headers['Content-Length'] = str(self.size) if self.etag: self.headers.add_header('Etag', self.etag) if self.expires: self.headers.add_header('Expires', self.expires) try: # Implement 206 partial file support. start, end = headers['range'].split('-') start = 0 if not start.isdigit() else int(start) end = self.size if not end.isdigit() else int(end) if self.size < end or start < 0: self.status = "214 Unsatisfiable Range Requested" self.data = FileWrapper(f, CHUNK_SIZE) else: f.seek(start) self.data = LimitingFileWrapper(f, CHUNK_SIZE, limit=end) self.status = "206 Partial Content" except: self.data = FileWrapper(f, CHUNK_SIZE) except IOError: self.status = "403 Forbidden" def serve_dir(self, pth, rpth): def rel_path(path): return os.path.normpath(path[len(self.root):] if path.startswith(self.root) else path) if not self.display_index: self.status = '404 File Not Found' return b('') else: self.content_type = 'text/html' dir_contents = [os.path.join(pth, x) for x in os.listdir(os.path.normpath(pth))] dir_contents.sort() dirs = [rel_path(x)+'/' for x in dir_contents if os.path.isdir(x)] files = [rel_path(x) for x in dir_contents if os.path.isfile(x)] self.data = [INDEX_HEADER % dict(path='/'+rpth)] if rpth: self.data += [INDEX_ROW % dict(name='(parent directory)', cls='dir parent', link='/'.join(rpth[:-1].split('/')[:-1]))] self.data += [INDEX_ROW % dict(name=os.path.basename(x[:-1]), link=os.path.join(rpth, os.path.basename(x[:-1])).replace('\\', '/'), cls='dir') for x in dirs] self.data += ['Files'] self.data += [INDEX_ROW % dict(name=os.path.basename(x), link=os.path.join(rpth, os.path.basename(x)).replace('\\', '/'), cls='file') for x in files] self.data += [INDEX_FOOTER] self.headers['Content-Length'] = self.size = str(sum([len(x) for x in self.data])) self.status = '200 OK' def run_app(self, conn): self.status = "200 OK" self.size = 0 self.expires = None self.etag = None self.content_type = 'text/plain' self.content_length = None if __debug__: self.err_log.debug('Getting sock_file') # Build our file-like object sock_file = conn.makefile('rb',BUF_SIZE) request = self.read_request_line(sock_file) if request['method'].upper() not in ('GET', ): self.status = "501 Not Implemented" try: # Get our file path headers = dict([(str(k.lower()), v) for k, v in self.read_headers(sock_file).items()]) rpath = request.get('path', '').lstrip('/') filepath = os.path.join(self.root, rpath) filepath = os.path.abspath(filepath) if __debug__: self.err_log.debug('Request for path: %s' % filepath) self.closeConnection = headers.get('connection', 'close').lower() == 'close' self.headers = Headers([('Date', formatdate(usegmt=True)), ('Server', HTTP_SERVER_SOFTWARE), ('Connection', headers.get('connection', 'close')), ]) if not filepath.lower().startswith(self.root.lower()): # File must be within our root directory self.status = "400 Bad Request" self.closeConnection = True elif not os.path.exists(filepath): self.status = "404 File Not Found" self.closeConnection = True elif os.path.isdir(filepath): self.serve_dir(filepath, rpath) elif os.path.isfile(filepath): self.serve_file(filepath, headers) else: # It exists but it's not a file or a directory???? # What is it then? self.status = "501 Not Implemented" self.closeConnection = True h = self.headers statcode, statstr = self.status.split(' ', 1) statcode = int(statcode) if statcode >= 400: h.add_header('Content-Type', self.content_type) self.data = [statstr] # Build our output headers header_data = HEADER_RESPONSE % (self.status, str(h)) # Send the headers if __debug__: self.err_log.debug('Sending Headers: %s' % repr(header_data)) self.conn.sendall(b(header_data)) for data in self.data: self.conn.sendall(b(data)) if hasattr(self.data, 'close'): self.data.close() finally: if __debug__: self.err_log.debug('Finally closing sock_file') sock_file.close() # Monolithic build...end of module: rocket\methods\fs.py # Monolithic build...start of module: rocket\methods\wsgi.py # Import System Modules import sys import socket from wsgiref.headers import Headers from wsgiref.util import FileWrapper # Import Package Modules # package imports removed in monolithic build if PY3K: from email.utils import formatdate else: # Caps Utils for Py2.4 compatibility from email.Utils import formatdate # Define Constants NEWLINE = b('\r\n') HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' BASE_ENV = {'SERVER_NAME': SERVER_NAME, 'SCRIPT_NAME': '', # Direct call WSGI does not need a name 'wsgi.errors': sys.stderr, 'wsgi.version': (1, 0), 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'wsgi.file_wrapper': FileWrapper } class WSGIWorker(Worker): def __init__(self, *args, **kwargs): """Builds some instance variables that will last the life of the thread.""" Worker.__init__(self, *args, **kwargs) if isinstance(self.app_info, dict): multithreaded = self.app_info.get('max_threads') != 1 else: multithreaded = False self.base_environ = dict({'SERVER_SOFTWARE': self.app_info['server_software'], 'wsgi.multithread': multithreaded, }) self.base_environ.update(BASE_ENV) # Grab our application self.app = self.app_info.get('wsgi_app') if not hasattr(self.app, "__call__"): raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) # Enable futures if has_futures and self.app_info.get('futures'): executor = self.app_info['executor'] self.base_environ.update({"wsgiorg.executor": executor, "wsgiorg.futures": executor.futures}) def build_environ(self, sock_file, conn): """ Build the execution environment. """ # Grab the request line request = self.read_request_line(sock_file) # Copy the Base Environment environ = self.base_environ.copy() # Grab the headers for k, v in self.read_headers(sock_file).items(): environ[str('HTTP_'+k)] = v # Add CGI Variables environ['REQUEST_METHOD'] = request['method'] environ['PATH_INFO'] = request['path'] environ['SERVER_PROTOCOL'] = request['protocol'] environ['SERVER_PORT'] = str(conn.server_port) environ['REMOTE_PORT'] = str(conn.client_port) environ['REMOTE_ADDR'] = str(conn.client_addr) environ['QUERY_STRING'] = request['query_string'] if 'HTTP_CONTENT_LENGTH' in environ: environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH'] if 'HTTP_CONTENT_TYPE' in environ: environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE'] # Save the request method for later self.request_method = environ['REQUEST_METHOD'] # Add Dynamic WSGI Variables if conn.ssl: environ['wsgi.url_scheme'] = 'https' environ['HTTPS'] = 'on' else: environ['wsgi.url_scheme'] = 'http' if conn.ssl: try: peercert = conn.socket.getpeercert(binary_form=True) environ['SSL_CLIENT_RAW_CERT'] = \ peercert and ssl.DER_cert_to_PEM_cert(peercert) except Exception,e: print e if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked': environ['wsgi.input'] = ChunkedReader(sock_file) else: environ['wsgi.input'] = sock_file return environ def send_headers(self, data, sections): h_set = self.header_set # Does the app want us to send output chunked? self.chunked = h_set.get('transfer-encoding', '').lower() == 'chunked' # Add a Date header if it's not there already if not 'date' in h_set: h_set['Date'] = formatdate(usegmt=True) # Add a Server header if it's not there already if not 'server' in h_set: h_set['Server'] = HTTP_SERVER_SOFTWARE if 'content-length' in h_set: self.size = int(h_set['content-length']) else: s = int(self.status.split(' ')[0]) if s < 200 or s not in (204, 205, 304): if not self.chunked: if sections == 1: # Add a Content-Length header if it's not there already h_set['Content-Length'] = str(len(data)) self.size = len(data) else: # If they sent us more than one section, we blow chunks h_set['Transfer-Encoding'] = 'Chunked' self.chunked = True if __debug__: self.err_log.debug('Adding header...' 'Transfer-Encoding: Chunked') if 'connection' not in h_set: # If the application did not provide a connection header, fill it in client_conn = self.environ.get('HTTP_CONNECTION', '').lower() if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1': # HTTP = 1.1 defaults to keep-alive connections if client_conn: h_set['Connection'] = client_conn else: h_set['Connection'] = 'keep-alive' else: # HTTP < 1.1 supports keep-alive but it's quirky so we don't support it h_set['Connection'] = 'close' # Close our connection if we need to. self.closeConnection = h_set.get('connection', '').lower() == 'close' # Build our output headers header_data = HEADER_RESPONSE % (self.status, str(h_set)) # Send the headers if __debug__: self.err_log.debug('Sending Headers: %s' % repr(header_data)) self.conn.sendall(b(header_data)) self.headers_sent = True def write_warning(self, data, sections=None): self.err_log.warning('WSGI app called write method directly. This is ' 'deprecated behavior. Please update your app.') return self.write(data, sections) def write(self, data, sections=None): """ Write the data to the output socket. """ if self.error[0]: self.status = self.error[0] data = b(self.error[1]) if not self.headers_sent: self.send_headers(data, sections) if self.request_method != 'HEAD': try: if self.chunked: self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data))) else: self.conn.sendall(data) except socket.error: # But some clients will close the connection before that # resulting in a socket error. self.closeConnection = True def start_response(self, status, response_headers, exc_info=None): """ Store the HTTP status and headers to be sent when self.write is called. """ if exc_info: try: if self.headers_sent: # Re-raise original exception if headers sent # because this violates WSGI specification. raise finally: exc_info = None elif self.header_set: raise AssertionError("Headers already set!") if PY3K and not isinstance(status, str): self.status = str(status, 'ISO-8859-1') else: self.status = status # Make sure headers are bytes objects try: self.header_set = Headers(response_headers) except UnicodeDecodeError: self.error = ('500 Internal Server Error', 'HTTP Headers should be bytes') self.err_log.error('Received HTTP Headers from client that contain' ' invalid characters for Latin-1 encoding.') return self.write_warning def run_app(self, conn): self.size = 0 self.header_set = Headers([]) self.headers_sent = False self.error = (None, None) self.chunked = False sections = None output = None if __debug__: self.err_log.debug('Getting sock_file') # Build our file-like object if PY3K: sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE) else: sock_file = conn.makefile(BUF_SIZE) try: # Read the headers and build our WSGI environment self.environ = environ = self.build_environ(sock_file, conn) # Handle 100 Continue if environ.get('HTTP_EXPECT', '') == '100-continue': res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n' conn.sendall(b(res)) # Send it to our WSGI application output = self.app(environ, self.start_response) if not hasattr(output, '__len__') and not hasattr(output, '__iter__'): self.error = ('500 Internal Server Error', 'WSGI applications must return a list or ' 'generator type.') if hasattr(output, '__len__'): sections = len(output) for data in output: # Don't send headers until body appears if data: self.write(data, sections) if self.chunked: # If chunked, send our final chunk length self.conn.sendall(b('0\r\n\r\n')) elif not self.headers_sent: # Send headers if the body was empty self.send_headers('', sections) # Don't capture exceptions here. The Worker class handles # them appropriately. finally: if __debug__: self.err_log.debug('Finally closing output and sock_file') if hasattr(output,'close'): output.close() sock_file.close() # Monolithic build...end of module: rocket\methods\wsgi.py # # the following code is not part of Rocket but was added in web2py for testing purposes # def demo_app(environ, start_response): global static_folder import os types = {'htm': 'text/html','html': 'text/html','gif': 'image/gif', 'jpg': 'image/jpeg','png': 'image/png','pdf': 'applications/pdf'} if static_folder: if not static_folder.startswith('/'): static_folder = os.path.join(os.getcwd(),static_folder) path = os.path.join(static_folder, environ['PATH_INFO'][1:] or 'index.html') type = types.get(path.split('.')[-1],'text') if os.path.exists(path): try: data = open(path,'rb').read() start_response('200 OK', [('Content-Type', type)]) except IOError: start_response('404 NOT FOUND', []) data = '404 NOT FOUND' else: start_response('500 INTERNAL SERVER ERROR', []) data = '500 INTERNAL SERVER ERROR' else: start_response('200 OK', [('Content-Type', 'text/html')]) data = '

Hello from Rocket Web Server

' return [data] def demo(): from optparse import OptionParser parser = OptionParser() parser.add_option("-i", "--ip", dest="ip",default="127.0.0.1", help="ip address of the network interface") parser.add_option("-p", "--port", dest="port",default="8000", help="post where to run web server") parser.add_option("-s", "--static", dest="static",default=None, help="folder containing static files") (options, args) = parser.parse_args() global static_folder static_folder = options.static print 'Rocket running on %s:%s' % (options.ip, options.port) r=Rocket((options.ip,int(options.port)),'wsgi', {'wsgi_app':demo_app}) r.start() if __name__=='__main__': demo()