mini-web框架-实现伪静态url
readme.txt(新建)
运行方式如下:
python3 web_server.py 7890 my_web:application
web_server.py(部分更新)
import select
import time
import socket
import sys
import re
import multiprocessing
class WSGIServer(object):
"""定义一个WSGI服务器的类"""
def __init__(self, port, documents_root, app):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind(("", port))
self.server_socket.listen(128)
self.documents_root = documents_root
self.app = app
def run_forever(self):
"""运行服务器"""
while True:
new_socket, new_addr = self.server_socket.accept()
new_socket.settimeout(3)
new_process = multiprocessing.Process(target=self.deal_with_request, args=(new_socket,))
new_process.start()
new_socket.close()
def deal_with_request(self, client_socket):
"""以长链接的方式,为这个浏览器服务器"""
while True:
try:
request = client_socket.recv(1024).decode("utf-8")
except Exception as ret:
print("========>", ret)
client_socket.close()
return
if not request:
client_socket.close()
return
request_lines = request.splitlines()
for i, line in enumerate(request_lines):
print(i, line)
ret = re.match(r"([^/]*)([^ ]+)", request_lines[0])
if ret:
print("正则提取数据:", ret.group(1))
print("正则提取数据:", ret.group(2))
file_name = ret.group(2)
if file_name == "/":
file_name = "/index.html"
if not file_name.endswith(".html"):
try:
print(self.documents_root+file_name)
f = open(self.documents_root+file_name, "rb")
except:
response_body = "file not found, 请输入正确的url"
response_header = "HTTP/1.1 404 not found\r\n"
response_header += "Content-Type: text/html; charset=utf-8\r\n"
response_header += "Content-Length: %d\r\n" % (len(response_body))
response_header += "\r\n"
response = response_header + response_body
client_socket.send(response.encode('utf-8'))
else:
content = f.read()
f.close()
response_body = content
response_header = "HTTP/1.1 200 OK\r\n"
response_header += "Content-Length: %d\r\n" % (len(response_body))
response_header += "\r\n"
client_socket.send(response_header.encode('utf-8') + response_body)
else:
env = dict()
env['PATH_INFO'] = file_name
response_body = self.app(env, self.set_response_headers)
response_header = "HTTP/1.1 {status}\r\n".format(status=self.headers[0])
response_header += "Content-Type: text/html; charset=utf-8\r\n"
response_header += "Content-Length: %d\r\n" % len(response_body.encode("utf-8"))
for temp_head in self.headers[1]:
response_header += "{0}:{1}\r\n".format(*temp_head)
response = response_header + "\r\n"
response += response_body
client_socket.send(response.encode('utf-8'))
def set_response_headers(self, status, headers):
"""这个方法,会在 web框架中被默认调用"""
response_header_default = [
("Data", time.time()),
("Server", "ItCast-python mini web server")
]
self.headers = [status, response_header_default + headers]
g_static_document_root = "./static"
g_dynamic_document_root = "./dynamic"
def main():
"""控制web服务器整体"""
if len(sys.argv) == 3:
port = sys.argv[1]
if port.isdigit():
port = int(port)
web_frame_module_app_name = sys.argv[2]
else:
print("运行方式如: python3 xxx.py 7890 my_web_frame_name:application")
return
print("http服务器使用的port:%s" % port)
sys.path.append(g_dynamic_document_root)
ret = re.match(r"([^:]*):(.*)", web_frame_module_app_name)
if ret:
web_frame_module_name = ret.group(1)
app_name = ret.group(2)
web_frame_module = __import__(web_frame_module_name)
app = getattr(web_frame_module, app_name)
http_server = WSGIServer(port, g_static_document_root, app)
http_server.run_forever()
if __name__ == "__main__":
main()
my_web.py(部分修改)
import time
import os
import re
template_root = "./templates"
g_url_route = dict()
def route(url):
def func1(func):
g_url_route[url]=func
def func2(file_name):
return func(file_name)
return func2
return func1
@route("/index.html") # ------- 修改------
def index(file_name):
"""返回index.html需要的页面内容"""
try:
file_name = file_name.replace(".py", ".html")
f = open(template_root + file_name)
except Exception as ret:
return "%s" % ret
else:
content = f.read()
f.close()
data_from_mysql = "暂时没有数据,请等待学习mysql吧,学习完mysql之后,这里就可以放入mysql查询到的数据了"
content = re.sub(r"\{%content%\}", data_from_mysql, content)
return content
@route("/center.html") # ------- 修改------
def center(file_name):
"""返回center.html需要的页面内容"""
try:
file_name = file_name.replace(".py", ".html")
f = open(template_root + file_name)
except Exception as ret:
return "%s" % ret
else:
content = f.read()
f.close()
data_from_mysql = "暂时没有数据,,,,~~~~(>_<)~~~~ "
content = re.sub(r"\{%content%\}", data_from_mysql, content)
return content
def application(environ, start_response):
status = '200 OK'
response_headers = [('Content-Type', 'text/html')]
start_response(status, response_headers)
file_name = environ['PATH_INFO']
try:
return g_url_route[file_name](file_name)
except Exception as ret:
return "%s" % ret
else:
return str(environ) + '-----404--->%s\n'