逐步引入:
1. 最简单的web server
#!/usr/bin/env python
# coding:utf-8 import socket sk = socket.socket() sk.bind(("127.0.0.1", 8000)) sk.listen() while 1:
conn, _ = sk.accept() # py中使用单下划线代表没有用到的变量 data = conn.recv(8096) print(data) conn.send(b'http/1.1 200 OK\r\ncontent-type:text/html; charset=utf-8\r\n\r\n')
conn.send(b'<h2>Hello,world.</h2>') conn.close()
sk.close() # 以上是所有浏览器的本质
2.不同路径的响应:
#!/usr/bin/env python
# coding:utf-8 import socket sk = socket.socket() sk.bind(("127.0.0.1", 8000)) sk.listen() # 写一个死循环,一直等待客户端来连我
while 1:
# 获取与客户端的连接
conn, _ = sk.accept()
# 接收客户端发来消息
data = conn.recv(8096)
# 把收到的数据转成字符串类型
data_str = str(data, encoding="utf-8") # bytes("str", enconding="utf-8")
# print(data_str)
# 用\r\n去切割上面的字符串
l1 = data_str.split("\r\n")
# print(l1[0])
# 按照空格切割上面的字符串
l2 = l1[0].split()
url = l2[1]
# 给客户端回复消息
conn.send(b'http/1.1 200 OK\r\ncontent-type:text/html; charset=utf-8\r\n\r\n')
# 想让浏览器在页面上显示出来的内容都是响应正文 # 根据不同的url返回不同的内容
if url == "/yimi/":
response = b'<h1>hello yimi!</h1>'
elif url == "/xiaohei/":
response = b'<h1>hello xiaohei!</h1>'
else:
response = b'<h1>404! not found!</h1>'
conn.send(response)
# 关闭
conn.close()
# sk.close()
3.函数版server
#!/usr/bin/env python
# coding:utf-8 """
完善的web服务端示例
函数版根据不同的路径返回不同的内容
""" import socket # 生成socket实例对象
sk = socket.socket()
# 绑定IP和端口
sk.bind(("127.0.0.1", 8001))
# 监听
sk.listen() # 定义一个处理/yimi/的函数
def yimi(url):
ret = 'hello {}'.format(url)
return bytes(ret, encoding="utf-8") # 定义一个处理/xiaohei/的函数
def xiaohei(url):
ret = '你好。 {}'.format(url)
return bytes(ret, encoding="utf-8") # 写一个死循环,一直等待客户端来连我
while 1:
# 获取与客户端的连接
conn, _ = sk.accept()
# 接收客户端发来消息
data = conn.recv(8096)
# 把收到的数据转成字符串类型
data_str = str(data, encoding="utf-8") # bytes("str", enconding="utf-8")
# print(data_str)
# 用\r\n去切割上面的字符串
l1 = data_str.split("\r\n")
# print(l1[0])
# 按照空格切割上面的字符串
l2 = l1[0].split()
url = l2[1]
# 给客户端回复消息
conn.send(b'http/1.1 200 OK\r\ncontent-type:text/html; charset=utf-8\r\n\r\n')
# 想让浏览器在页面上显示出来的内容都是响应正文 # 根据不同的url返回不同的内容
if url == "/yimi/":
response = yimi(url)
elif url == "/xiaohei/":
response = xiaohei(url)
else:
response = b'<h1>404! not found!</h1>'
conn.send(response)
# 关闭
conn.close()
4.进阶版server
#!/usr/bin/env python
# coding:utf-8 """
完善的web服务端示例
函数版根据不同的路径返回不同的内容
进阶函数版 不写if判断了,用url名字去找对应的函数名
""" import socket # 生成socket实例对象
sk = socket.socket()
# 绑定IP和端口
sk.bind(("127.0.0.1", 8001))
# 监听
sk.listen() # 定义一个处理/yimi/的函数
def yimi(url):
ret = 'hello {}'.format(url)
return bytes(ret, encoding="utf-8") # 定义一个处理/xiaohei/的函数
def xiaohei(url):
ret = 'hello {}'.format(url)
return bytes(ret, encoding="utf-8") # 定义一个专门用来处理404的函数
def f404(url):
ret = "你访问的这个{} 找不到".format(url)
return bytes(ret, encoding="utf-8") url_func = [
("/yimi/", yimi),
("/xiaohei/", xiaohei),
] # 写一个死循环,一直等待客户端来连我
while 1:
# 获取与客户端的连接
conn, _ = sk.accept()
# 接收客户端发来消息
data = conn.recv(8096)
# 把收到的数据转成字符串类型
data_str = str(data, encoding="utf-8") # bytes("str", enconding="utf-8")
# print(data_str)
# 用\r\n去切割上面的字符串
l1 = data_str.split("\r\n")
# print(l1[0])
# 按照空格切割上面的字符串
l2 = l1[0].split()
url = l2[1]
# 给客户端回复消息
conn.send(b'http/1.1 200 OK\r\ncontent-type:text/html; charset=utf-8\r\n\r\n')
# 想让浏览器在页面上显示出来的内容都是响应正文 # 根据不同的url返回不同的内容
# 去url_func里面找对应关系
for i in url_func:
if i[0] == url:
func = i[1]
break
# 找不到对应关系就默认执行f404函数
else:
func = f404
# 拿到函数的执行结果
response = func(url)
# 将函数返回的结果发送给浏览器
conn.send(response)
# 关闭连接
conn.close()
5.返回动态html 其中yimi.html中有个内容是:<p>@@xx@@</p> 在响应时将被先替换再返回
"""
完善的web服务端示例
函数版根据不同的路径返回不同的内容
进阶函数版 不写if判断了,用url名字去找对应的函数名
返回html页面
返回动态的html页面
""" import socket # 生成socket实例对象
sk = socket.socket()
# 绑定IP和端口
sk.bind(("127.0.0.1", 8001))
# 监听
sk.listen() # 定义一个处理/yimi/的函数
def yimi(url):
with open("yimi.html", "r", encoding="utf-8") as f:
ret = f.read()
import time
# 得到替换后的字符串
ret2 = ret.replace("@@xx@@", str(time.time()))
return bytes(ret2, encoding="utf-8") # 定义一个处理/xiaohei/的函数
def xiaohei(url):
with open("xiaohei.html", "rb") as f:
ret = f.read()
return ret # 定义一个专门用来处理404的函数
def f404(url):
ret = "你访问的这个{} 找不到".format(url)
return bytes(ret, encoding="utf-8") url_func = [
("/yimi/", yimi),
("/xiaohei/", xiaohei),
] # 写一个死循环,一直等待客户端来连我
while 1:
# 获取与客户端的连接
conn, _ = sk.accept()
# 接收客户端发来消息
data = conn.recv(8096)
# 把收到的数据转成字符串类型
data_str = str(data, encoding="utf-8") # bytes("str", enconding="utf-8")
# print(data_str)
# 用\r\n去切割上面的字符串
l1 = data_str.split("\r\n")
# print(l1[0])
# 按照空格切割上面的字符串
l2 = l1[0].split()
url = l2[1]
# 给客户端回复消息
conn.send(b'http/1.1 200 OK\r\ncontent-type:text/html; charset=utf-8\r\n\r\n')
# 想让浏览器在页面上显示出来的内容都是响应正文 # 根据不同的url返回不同的内容
# 去url_func里面找对应关系
for i in url_func:
if i[0] == url:
func = i[1]
break
# 找不到对应关系就默认执行f404函数
else:
func = f404
# 拿到函数的执行结果
response = func(url)
# 将函数返回的结果发送给浏览器
conn.send(response)
# 关闭连接
conn.close()
6.使用wsgiref模块
"""
根据URL中不同的路径返回不同的内容--函数进阶版
返回HTML页面
让网页动态起来
wsgiref模块版
""" import time
from wsgiref.simple_server import make_server # 将返回不同的内容部分封装成函数
def yimi(url):
with open("yimi.html", "r", encoding="utf8") as f:
s = f.read()
now = str(time.time())
s = s.replace("@@xx@@", now)
return bytes(s, encoding="utf8") def xiaohei(url):
with open("xiaohei.html", "r", encoding="utf8") as f:
s = f.read()
return bytes(s, encoding="utf8") # 定义一个url和实际要执行的函数的对应关系
list1 = [
("/yimi/", yimi),
("/xiaohei/", xiaohei),
] def run_server(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf8'), ]) # 设置HTTP响应的状态码和头信息
url = environ['PATH_INFO'] # 取到用户输入的url
func = None
for i in list1:
if i[0] == url:
func = i[1]
break
if func:
response = func(url)
else:
response = b"404 not found!"
return [response, ] if __name__ == '__main__':
httpd = make_server('127.0.0.1', 8090, run_server)
print("我在8090等你哦...")
httpd.serve_forever()
7.使用jinja2
from wsgiref.simple_server import make_server
from jinja2 import Template def index():
with open("jinja2test.html", "r", encoding="utf-8") as f:
data = f.read()
template = Template(data) # 生成模板文件
# 从数据库中取数据
import pymysql conn = pymysql.connect(
host="192.168.112.13",
port=3306,
user="root",
password="",
database="test",
charset="utf8",
)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
cursor.execute("select * from users;")
user_list = cursor.fetchall()
# 实现字符串的替换
ret = template.render({"user_list": user_list}) # 把数据填充到模板里面
return [bytes(ret, encoding="utf8"), ] def home():
with open("home.html", "rb") as f:
data = f.read()
return [data, ] # 定义一个url和函数的对应关系
URL_LIST = [
("/index/", index),
("/home/", home),
] def run_server(environ, start_response):
start_response('200 OK', [('Content-Type', 'text/html;charset=utf8'), ]) # 设置HTTP响应的状态码和头信息
url = environ['PATH_INFO'] # 取到用户输入的url
func = None # 将要执行的函数
for i in URL_LIST:
if i[0] == url:
func = i[1] # 去之前定义好的url列表里找url应该执行的函数
break
if func: # 如果能找到要执行的函数
return func() # 返回函数的执行结果
else:
return [bytes("404没有该页面", encoding="utf8"), ] if __name__ == '__main__':
httpd = make_server('', 8000, run_server)
print("Serving HTTP on port 8000...")
httpd.serve_forever()
其中jinja2test.html内容:
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta http-equiv="x-ua-compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>Title</title>
</head>
<body> <table border="1">
<thead>
<tr>
<th>ID</th>
<th>用户名</th>
<th>性别</th>
<th>年龄</th>
<th>部门</th>
</tr>
</thead>
<tbody>
{% for user in user_list %}
<tr>
<td>{{user.sno}}</td>
<td>{{user.sname}}</td>
<td>{{user.sgender}}</td>
<td>{{user.sage}}</td>
<td>{{user.sdept}}</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>