models
from orm_control.ORM import Models, StringField, IntegerField
class User(Models):
u_id = IntegerField(name='u_id', primary_key=True)
username = StringField(name='username')
password = StringField(name='password')
user_type = StringField(name='user_type')
is_vip = IntegerField(name='is_vip')
register_time = StringField(name='register_time')
class Movie(Models):
m_id = IntegerField(name='m_id', primary_key=True)
movie_name = StringField(name='movie_name')
movie_size = StringField(name='movie_size')
movie_path = StringField(name='movie_path')
is_free = IntegerField(name='is_free')
u_id = IntegerField(name='u_id')
file_md5 = StringField(name='file_md5')
upload_time = StringField(name='upload_time')
is_delete = IntegerField(name='is_delete')
class Notice(Models):
n_id = IntegerField(name='n_id', primary_key=True)
title = StringField(name='title')
content = StringField(name='content')
create_time = StringField(name='create_time')
u_id = IntegerField(name='u_id')
class DownloadRecord(Models):
table_name = 'download_record'
d_id = IntegerField(name='d_id', primary_key=True)
m_id = IntegerField(name='m_id')
u_id = IntegerField(name='u_id')
download_time = StringField(name='download_time')
#SESSION_DATA
session_dict = {
}
mutex = None
ADMIN_API
from db.models import User, Movie, Notice
import datetime
from lib import common
import os
from conf import settings
@common.login_auth
def upload_movie_interface(back_dic, conn):
movie_name = back_dic.get('movie_name')
movie_path = os.path.join(
settings.MOVIE_FILES_PATH, movie_name
)
movie_size = back_dic.get('movie_size')
recv_data = 0
with open(movie_path, 'wb') as f:
while recv_data < movie_size:
data = conn.recv(1024)
f.write(data)
recv_data += len(data)
movie_obj = Movie(
movie_name=movie_name,
movie_size=movie_size,
movie_path=movie_path,
is_free=back_dic.get('is_free'),
u_id=back_dic.get('user_id'),
file_md5=back_dic.get('file_md5'),
upload_time=datetime.datetime.now(),
is_delete=0
)
movie_obj.orm_insert()
send_dic = {'flag': True, 'msg': f'电影[{movie_name}]上传成功'}
common.send_data(send_dic, conn)
@common.login_auth
def delete_movie_interface(back_dic, conn):
movie_id = back_dic.get('movie_id')
movie_obj = Movie.orm_select(m_id=movie_id)[0]
movie_obj.is_delete = 1
movie_obj.orm_update()
send_dic = {'flag': True,
'msg': f'电影[{movie_obj.movie_name}]删除成功!'}
common.send_data(send_dic, conn)
@common.login_auth
def send_notice_interface(back_dic, conn):
notice_obj = Notice(
title=back_dic.get('title'),
content=back_dic.get('content'),
create_time=datetime.datetime.now(),
u_id=back_dic.get('user_id')
)
notice_obj.orm_insert()
send_dic = {
'msg': f'公告[{notice_obj.title}]发布成功!'
}
common.send_data(send_dic, conn)
USER_API
from lib import common
from db import models
import datetime
@common.login_auth
def pay_vip_interface(back_dic, conn):
user_obj = models.User.orm_select(
u_id=back_dic.get('user_id'))[0]
user_obj.is_vip = 1
user_obj.orm_update()
send_dic = {
'msg': '会员充值成功'
}
common.send_data(send_dic, conn)
@common.login_auth
def download_movie_interface(back_dic, conn):
movie_id = back_dic.get('movie_id')
movie_obj = models.Movie.orm_select(
m_id=movie_id)[0]
movie_name = movie_obj.movie_name
movie_path = movie_obj.movie_path
movie_size = movie_obj.movie_size
send_dic = {
'flag': True,
'movie_size': movie_size,
'movie_name': movie_name
}
common.send_data(send_dic, conn)
with open(movie_path, 'rb') as f:
for line in f:
conn.send(line)
record_obj = models.DownloadRecord(
m_id=movie_id,
u_id=back_dic.get('user_id'),
download_time=datetime.datetime.now()
)
record_obj.orm_insert()
@common.login_auth
def check_record_interface(back_dic, conn):
user_id = back_dic.get('user_id')
record_obj_list = models.DownloadRecord.orm_select(u_id=user_id)
if not record_obj_list:
send_dic = {
'flag': False,
'msg': '没有下载记录...'
}
common.send_data(send_dic, conn)
else:
record_list = []
for record_obj in record_obj_list:
movie_id = record_obj.m_id
movie_obj = models.Movie.orm_select(m_id=movie_id)[0]
movie_name = movie_obj.movie_name
download_time = record_obj.download_time
record_list.append(
[movie_name, str(download_time)]
)
send_dic = {
'flag': True,
'record_list': record_list
}
common.send_data(send_dic, conn)
@common.login_auth
def check_notice_interface(back_dic, conn):
notice_obj_list = models.Notice.orm_select()
if not notice_obj_list:
send_dic = {
'flag': False,
'msg': '没有公告,请等待楼主更新!!'
}
else:
notice_list = []
for notice_obj in notice_obj_list:
notice_list.append(
{notice_obj.title: notice_obj.content,
'create_time': str(notice_obj.create_time)
}
)
send_dic = {
'flag': True,
'notice_list': notice_list
}
common.send_data(send_dic, conn)
COMMON_API
from lib import common
from db import models
import datetime
from db.session_data import session_dict
from db import session_data
def register_interface(back_dic, conn):
username = back_dic.get('username')
password = back_dic.get('password')
user_obj_list = models.User.orm_select(username=username)
if user_obj_list:
send_dic = {'flag': False, 'msg': '用户已存在'}
else:
user_obj = models.User(
username=username,
password=password,
user_type=back_dic.get('user_type'),
is_vip=0,
register_time=datetime.datetime.now()
)
user_obj.orm_insert()
send_dic = {'flag': True, 'msg': '注册成功'}
common.send_data(send_dic, conn)
def login_interface(back_dic, conn):
user_obj_list = models.User.orm_select(username=back_dic.get('username'))
if not user_obj_list:
send_dic = {'flag': False, 'msg': '用户不存在'}
else:
user_obj = user_obj_list[0]
client_password = back_dic.get('password')
mysql_password = user_obj.password
if client_password == mysql_password:
addr = back_dic.get('addr')
session = common.get_session()
session_data.mutex.acquire()
session_dict[addr] = [session, user_obj.u_id]
session_data.mutex.release()
is_vip = False
if user_obj.is_vip:
is_vip = True
send_dic = {'flag': True,
'msg': '登录成功',
'session': session,
'is_vip': is_vip
}
else:
send_dic = {'flag': False, 'msg': '密码错误'}
common.send_data(send_dic, conn)
@common.login_auth
def check_movie_interface(back_dic, conn):
file_md5 = back_dic.get('file_md5')
movie_obj_list = models.Movie.orm_select(file_md5=file_md5)
if not movie_obj_list:
send_dic = {'flag': True, 'msg': '电影不存在'}
else:
send_dic = {'flag': False, 'msg': '电影已存在!'}
common.send_data(send_dic, conn)
@common.login_auth
def get_movie_list_interface(back_dic, conn):
movie_obj_list = models.Movie.orm_select()
movie_list = []
if movie_obj_list:
for movie_obj in movie_obj_list:
if not movie_obj.is_delete:
if back_dic.get('movie_type') == 'all':
movie_list.append(
[movie_obj.movie_name,
movie_obj.m_id,
'免费' if movie_obj.is_free else '收费'
]
)
elif back_dic.get('movie_type') == 'free':
if movie_obj.is_free:
movie_list.append(
[movie_obj.movie_name,
movie_obj.m_id,
'免费']
)
else:
if not movie_obj.is_free:
movie_list.append(
[movie_obj.movie_name,
movie_obj.m_id,
'收费']
)
send_dic = {
'flag': True, 'movie_list': movie_list}
else:
send_dic = {
'flag': False, 'msg': '没有电影..'}
common.send_data(send_dic, conn)
COMMON
import json
import struct
import hashlib
import uuid
from db.session_data import session_dict
def send_data(send_dic, conn):
json_data = json.dumps(send_dic).encode('utf-8')
headers = struct.pack('i', len(json_data))
conn.send(headers)
conn.send(json_data)
def get_session():
md5 = hashlib.md5()
md5.update(str(uuid.uuid4()).encode('utf-8'))
return md5.hexdigest()
def login_auth(func):
def inner(*args, **kwargs):
client_session = args[0].get('cookies')
addr = args[0].get('addr')
session_id_list = session_dict.get(addr)
if session_id_list:
server_session = session_id_list[0]
if client_session == server_session:
args[0]['user_id'] = session_id_list[1]
return func(*args, **kwargs)
else:
send_dic = {'flag': False, 'msg': '请先登录...'}
conn = args[1]
send_data(send_dic, conn)
return inner
ORM
import pymysql
class MySQLClient:
__instance = None
def __new__(cls, *args, **kwargs):
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
self.client = pymysql.connect(
host='127.0.0.1',
port=3306,
user='root',
password='155345',
database='youku',
charset='utf8',
autocommit=True
)
self.cursor = self.client.cursor(
pymysql.cursors.DictCursor
)
def my_select(self, sql, value=None):
self.cursor.execute(sql, value)
res = self.cursor.fetchall()
return res
def my_execute(self, sql, values):
try:
self.cursor.execute(sql, values)
except Exception as e:
print(e)
def close(self):
self.cursor.close()
self.client.close()
MYSQL
from orm_control.mysql_client import MySQLClient
class Field:
def __init__(self, name, column_type, primary_key, default):
self.name = name
self.column_type = column_type
self.primary_key = primary_key
self.default = default
class IntegerField(Field):
def __init__(self, name, column_type='int', primary_key=False, default=0):
super().__init__(name, column_type, primary_key, default)
class StringField(Field):
def __init__(self, name, column_type='varchar(64)', primary_key=False, default=None):
super().__init__(name, column_type, primary_key, default)
class OrmMetaClass(type):
def __new__(cls, class_name, class_base, class_dict):
if class_name == 'Models':
return type.__new__(cls, class_name, class_base, class_dict)
table_name = class_dict.get('table_name', class_name)
primary_key = None
mappings = {}
for key, value in class_dict.items():
if isinstance(value, Field):
mappings[key] = value
if value.primary_key:
if primary_key:
raise TypeError('只能有一个主键!')
primary_key = value.name
for key in mappings.keys():
class_dict.pop(key)
if not primary_key:
raise TypeError('必须有一个主键')
class_dict['table_name'] = table_name
class_dict['primary_key'] = primary_key
class_dict['mappings'] = mappings
return type.__new__(cls, class_name, class_base, class_dict)
class Models(dict, metaclass=OrmMetaClass):
def __getattr__(self, item):
return self.get(item)
def __setattr__(self, key, value):
self[key] = value
@classmethod
def orm_select(cls, **kwargs):
mysql = MySQLClient()
if not kwargs:
sql = 'select * from %s' % cls.table_name
res = mysql.my_select(sql)
else:
key = list(kwargs.keys())[0]
value = kwargs.get(key)
sql = 'select * from %s where %s=?' % (cls.table_name, key)
sql = sql.replace('?', '%s')
res = mysql.my_select(sql, value)
return [cls(**d) for d in res]
def orm_insert(self):
mysql = MySQLClient()
keys = []
values = []
args = []
for k, v in self.mappings.items():
if not v.primary_key:
keys.append(v.name)
values.append(
getattr(self, v.name, v.default)
)
args.append('?')
sql = 'insert into %s(%s) values(%s)' % (
self.table_name,
','.join(keys),
','.join(args)
)
sql = sql.replace('?', '%s')
mysql.my_execute(sql, values)
def orm_update(self):
mysql = MySQLClient()
keys = []
values = []
primary_key = None
for k, v in self.mappings.items():
if v.primary_key:
primary_key = v.name + '= %s' % getattr(self, v.name)
else:
keys.append(v.name + '=?')
values.append(
getattr(self, v.name) # 小贱贱 ---> 大贱贱
)
sql = 'update %s set %s where %s' % (
self.table_name,
','.join(keys),
primary_key
)
sql = sql.replace('?', '%s')
mysql.my_execute(sql, values)
SEVER
import socket
from conf import settings
from interface import admin_interface
from interface import common_interface
from interface import user_interface
import struct
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from db import session_data
pool = ThreadPoolExecutor(100)
func_dic = {
'register': common_interface.register_interface,
'login': common_interface.login_interface,
'check_movie': common_interface.check_movie_interface,
'upload_movie': admin_interface.upload_movie_interface,
'get_movie_list': common_interface.get_movie_list_interface,
'delete_movie': admin_interface.delete_movie_interface,
'send_notice': admin_interface.send_notice_interface,
'pay_vip': user_interface.pay_vip_interface,
'download_movie': user_interface.download_movie_interface,
'check_record': user_interface.check_record_interface,
'check_notice': user_interface.check_notice_interface
}
def run():
print('启动服务端')
session_data.mutex = Lock()
server = socket.socket()
server.bind(
(settings.ip, settings.port)
)
server.listen(5)
while True:
conn, addr = server.accept()
print(addr)
pool.submit(working, conn, addr)
def dispatcher(back_dic, conn):
type = back_dic.get('type')
if type in func_dic:
func_dic.get(type)(back_dic, conn)
def working(conn, addr):
while True:
try:
headers = conn.recv(4)
data_len = struct.unpack('i', headers)[0]
json_data = conn.recv(data_len).decode('utf-8')
back_dic = json.loads(json_data)
back_dic['addr'] = str(addr)
dispatcher(back_dic, conn)
except Exception as e:
print(e)
session_data.mutex.acquire()
session_data.session_dict.pop(str(addr))
session_data.mutex.release()
conn.close()
break
START
from socket_server import tcp_server
import os
import sys
sys.path.append(
os.path.dirname(__file__)
)
if __name__ == '__main__':
tcp_server.run()