优酷程序2

models

from orm_control.ORM import Models, StringField, IntegerField

class User(Models):
    u_id = IntegerField(name='u_id', primary_key=True)
    username = StringField(name='username')
    password = StringField(name='password')
    user_type = StringField(name='user_type')
    is_vip = IntegerField(name='is_vip')
    register_time = StringField(name='register_time')

class Movie(Models):
    m_id = IntegerField(name='m_id', primary_key=True)
    movie_name = StringField(name='movie_name')
    movie_size = StringField(name='movie_size')
    movie_path = StringField(name='movie_path')
    is_free = IntegerField(name='is_free')
    u_id = IntegerField(name='u_id')
    file_md5 = StringField(name='file_md5')
    upload_time = StringField(name='upload_time')
    is_delete = IntegerField(name='is_delete')

class Notice(Models):

    n_id = IntegerField(name='n_id', primary_key=True)
    title = StringField(name='title')
    content = StringField(name='content')
    create_time = StringField(name='create_time')
    u_id = IntegerField(name='u_id')


class DownloadRecord(Models):
    table_name = 'download_record'
    d_id = IntegerField(name='d_id', primary_key=True)
    m_id = IntegerField(name='m_id')
    u_id = IntegerField(name='u_id')
    download_time = StringField(name='download_time')

    #SESSION_DATA
    session_dict = {
}

mutex = None

ADMIN_API

from db.models import User, Movie, Notice
import datetime
from lib import common
import os
from conf import settings


@common.login_auth
def upload_movie_interface(back_dic, conn):
    movie_name = back_dic.get('movie_name')

    movie_path = os.path.join(
        settings.MOVIE_FILES_PATH, movie_name
    )

    movie_size = back_dic.get('movie_size')

    recv_data = 0

    with open(movie_path, 'wb') as f:
        while recv_data < movie_size:
            data = conn.recv(1024)
            f.write(data)
            recv_data += len(data)

    movie_obj = Movie(
        movie_name=movie_name,
        movie_size=movie_size,
        movie_path=movie_path,
        is_free=back_dic.get('is_free'),
        u_id=back_dic.get('user_id'),
        file_md5=back_dic.get('file_md5'),
        upload_time=datetime.datetime.now(),
        is_delete=0
    )

    movie_obj.orm_insert()

    send_dic = {'flag': True, 'msg': f'电影[{movie_name}]上传成功'}

    common.send_data(send_dic, conn)

@common.login_auth
def delete_movie_interface(back_dic, conn):
    movie_id = back_dic.get('movie_id')

    movie_obj = Movie.orm_select(m_id=movie_id)[0]

    movie_obj.is_delete = 1

    movie_obj.orm_update()

    send_dic = {'flag': True,
                'msg': f'电影[{movie_obj.movie_name}]删除成功!'}

    common.send_data(send_dic, conn)

@common.login_auth
def send_notice_interface(back_dic, conn):
    notice_obj = Notice(
        title=back_dic.get('title'),
        content=back_dic.get('content'),
        create_time=datetime.datetime.now(),
        u_id=back_dic.get('user_id')
    )

    notice_obj.orm_insert()

    send_dic = {
        'msg': f'公告[{notice_obj.title}]发布成功!'
    }

    common.send_data(send_dic, conn)

USER_API

from lib import common
from db import models
import datetime


@common.login_auth
def pay_vip_interface(back_dic, conn):

    user_obj = models.User.orm_select(
        u_id=back_dic.get('user_id'))[0]

    user_obj.is_vip = 1
    user_obj.orm_update()

    send_dic = {
        'msg': '会员充值成功'
    }

    common.send_data(send_dic, conn)


@common.login_auth
def download_movie_interface(back_dic, conn):
    movie_id = back_dic.get('movie_id')

    movie_obj = models.Movie.orm_select(
        m_id=movie_id)[0]

    movie_name = movie_obj.movie_name
    movie_path = movie_obj.movie_path
    movie_size = movie_obj.movie_size


    send_dic = {
        'flag': True,
        'movie_size': movie_size,
        'movie_name': movie_name
    }

    common.send_data(send_dic, conn)

    with open(movie_path, 'rb') as f:
        for line in f:
            conn.send(line)

    record_obj = models.DownloadRecord(
        m_id=movie_id,
        u_id=back_dic.get('user_id'),
        download_time=datetime.datetime.now()
    )

    record_obj.orm_insert()



@common.login_auth
def check_record_interface(back_dic, conn):

    user_id = back_dic.get('user_id')
    record_obj_list = models.DownloadRecord.orm_select(u_id=user_id)

    if not record_obj_list:
        send_dic = {
            'flag': False,
            'msg': '没有下载记录...'
        }
        common.send_data(send_dic, conn)

    else:
        record_list = []
        for record_obj in record_obj_list:
            movie_id = record_obj.m_id
            movie_obj = models.Movie.orm_select(m_id=movie_id)[0]
            movie_name = movie_obj.movie_name
            download_time = record_obj.download_time

            record_list.append(
                [movie_name, str(download_time)]
            )

        send_dic = {
            'flag': True,
            'record_list': record_list
        }

    common.send_data(send_dic, conn)

@common.login_auth
def check_notice_interface(back_dic, conn):

    notice_obj_list = models.Notice.orm_select()

    if not notice_obj_list:
        send_dic = {
            'flag': False,
            'msg': '没有公告,请等待楼主更新!!'
        }
    else:
        notice_list = []

        for notice_obj in notice_obj_list:

            notice_list.append(
                {notice_obj.title: notice_obj.content,
                 'create_time': str(notice_obj.create_time)
                 }
            )

        send_dic = {
            'flag': True,
            'notice_list': notice_list
        }

    common.send_data(send_dic, conn)

COMMON_API

from lib import common
from db import models
import datetime
from db.session_data import session_dict
from db import session_data


def register_interface(back_dic, conn):
    username = back_dic.get('username')
    password = back_dic.get('password')
    user_obj_list = models.User.orm_select(username=username)

    if user_obj_list:
        send_dic = {'flag': False, 'msg': '用户已存在'}

    else:
        user_obj = models.User(
               username=username,
               password=password,
               user_type=back_dic.get('user_type'),
               is_vip=0,
               register_time=datetime.datetime.now()
                               )

        user_obj.orm_insert()

        send_dic = {'flag': True, 'msg': '注册成功'}
    common.send_data(send_dic, conn)


def login_interface(back_dic, conn):

    user_obj_list = models.User.orm_select(username=back_dic.get('username'))

    if not user_obj_list:
        send_dic = {'flag': False, 'msg': '用户不存在'}

    else:
        user_obj = user_obj_list[0]

        client_password = back_dic.get('password')
        mysql_password = user_obj.password

        if client_password == mysql_password:

            addr = back_dic.get('addr')
            session = common.get_session()
            session_data.mutex.acquire()
            session_dict[addr] = [session, user_obj.u_id]
            session_data.mutex.release()

            is_vip = False

            if user_obj.is_vip:
                is_vip = True

            send_dic = {'flag': True,
                        'msg': '登录成功',
                        'session': session,
                        'is_vip': is_vip
                        }

        else:
            send_dic = {'flag': False, 'msg': '密码错误'}

    common.send_data(send_dic, conn)


@common.login_auth
def check_movie_interface(back_dic, conn):
    file_md5 = back_dic.get('file_md5')

    movie_obj_list = models.Movie.orm_select(file_md5=file_md5)

    if not movie_obj_list:
        send_dic = {'flag': True, 'msg': '电影不存在'}

    else:
        send_dic = {'flag': False, 'msg': '电影已存在!'}

    common.send_data(send_dic, conn)


@common.login_auth
def get_movie_list_interface(back_dic, conn):

    movie_obj_list = models.Movie.orm_select()

    movie_list = []

    if movie_obj_list:
        for movie_obj in movie_obj_list:

            if not movie_obj.is_delete:

                if back_dic.get('movie_type') == 'all':
                    movie_list.append(
                        [movie_obj.movie_name,
                         movie_obj.m_id,
                         '免费' if movie_obj.is_free else '收费'
                         ]
                    )
                elif back_dic.get('movie_type') == 'free':
                    if movie_obj.is_free:
                        movie_list.append(
                            [movie_obj.movie_name,
                             movie_obj.m_id,
                             '免费']
                        )

                else:
                    if not movie_obj.is_free:
                        movie_list.append(
                            [movie_obj.movie_name,
                             movie_obj.m_id,
                             '收费']
                        )

        send_dic = {
            'flag': True, 'movie_list': movie_list}

    else:
        send_dic = {
            'flag': False, 'msg': '没有电影..'}

    common.send_data(send_dic, conn)

COMMON

import json
import struct
import hashlib
import uuid
from db.session_data import session_dict


def send_data(send_dic, conn):
    json_data = json.dumps(send_dic).encode('utf-8')
    headers = struct.pack('i', len(json_data))

    conn.send(headers)

    conn.send(json_data)


def get_session():
    md5 = hashlib.md5()
    md5.update(str(uuid.uuid4()).encode('utf-8'))
    return md5.hexdigest()


def login_auth(func):
    def inner(*args, **kwargs):

        client_session = args[0].get('cookies')
        addr = args[0].get('addr')
        session_id_list = session_dict.get(addr)

        if session_id_list:
            server_session = session_id_list[0]
            if client_session == server_session:
                args[0]['user_id'] = session_id_list[1]

                return func(*args, **kwargs)

        else:

            send_dic = {'flag': False, 'msg': '请先登录...'}

            conn = args[1]

            send_data(send_dic, conn)

    return inner

ORM

import pymysql


class MySQLClient:

    __instance = None

    def __new__(cls, *args, **kwargs):
        if not cls.__instance:
            cls.__instance = object.__new__(cls)
        return cls.__instance

    def __init__(self):
        self.client = pymysql.connect(
            host='127.0.0.1',
            port=3306,
            user='root',
            password='155345',
            database='youku',
            charset='utf8',
            autocommit=True
        )

        self.cursor = self.client.cursor(
            pymysql.cursors.DictCursor
        )

    def my_select(self, sql, value=None):
        self.cursor.execute(sql, value)

        res = self.cursor.fetchall()
        return res

    def my_execute(self, sql, values):
        try:
            self.cursor.execute(sql, values)

        except Exception as e:
            print(e)

    def close(self):
        self.cursor.close()
        self.client.close()

MYSQL

from orm_control.mysql_client import MySQLClient

class Field:
    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.column_type = column_type
        self.primary_key = primary_key
        self.default = default

class IntegerField(Field):
    def __init__(self, name, column_type='int', primary_key=False, default=0):
        super().__init__(name, column_type, primary_key, default)


class StringField(Field):
    def __init__(self, name, column_type='varchar(64)', primary_key=False, default=None):
        super().__init__(name, column_type, primary_key, default)


class OrmMetaClass(type):
    def __new__(cls, class_name, class_base, class_dict):
        if class_name == 'Models':
            return type.__new__(cls, class_name, class_base, class_dict)

        table_name = class_dict.get('table_name', class_name)
        primary_key = None
        mappings = {}

        for key, value in class_dict.items():
            if isinstance(value, Field):
                mappings[key] = value

                if value.primary_key:
                    if primary_key:
                        raise TypeError('只能有一个主键!')

                    primary_key = value.name
        for key in mappings.keys():
            class_dict.pop(key)

        if not primary_key:
            raise TypeError('必须有一个主键')

        class_dict['table_name'] = table_name
        class_dict['primary_key'] = primary_key
        class_dict['mappings'] = mappings
        return type.__new__(cls, class_name, class_base, class_dict)


class Models(dict, metaclass=OrmMetaClass):
    def __getattr__(self, item):
        return self.get(item)

    def __setattr__(self, key, value):
        self[key] = value

    @classmethod
    def orm_select(cls, **kwargs):
        mysql = MySQLClient()

        if not kwargs:
            sql = 'select * from %s' % cls.table_name

            res = mysql.my_select(sql)

        else:
            key = list(kwargs.keys())[0]
            value = kwargs.get(key)

            sql = 'select * from %s where %s=?' % (cls.table_name, key)
            sql = sql.replace('?', '%s')
            res = mysql.my_select(sql, value)

        return [cls(**d) for d in res]

    def orm_insert(self):
        mysql = MySQLClient()
        keys = []
        values = []
        args = []

        for k, v in self.mappings.items():
            if not v.primary_key:
                keys.append(v.name)

                values.append(
                    getattr(self, v.name, v.default)
                )
                args.append('?')

        sql = 'insert into %s(%s) values(%s)' % (
            self.table_name,
            ','.join(keys),
            ','.join(args)
        )
        sql = sql.replace('?', '%s')

        mysql.my_execute(sql, values)

    def orm_update(self):
        mysql = MySQLClient()
        keys = []
        values = []
        primary_key = None

        for k, v in self.mappings.items():
            if v.primary_key:
                primary_key = v.name + '= %s' % getattr(self, v.name)

            else:
                keys.append(v.name + '=?')

                values.append(
                    getattr(self, v.name)  # 小贱贱 ---> 大贱贱
                )
        sql = 'update %s set %s where %s' % (
            self.table_name,
            ','.join(keys),
            primary_key
        )
        sql = sql.replace('?', '%s')

        mysql.my_execute(sql, values)

SEVER

import socket
from conf import settings
from interface import admin_interface
from interface import common_interface
from interface import user_interface
import struct
import json
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
from db import session_data

pool = ThreadPoolExecutor(100)


func_dic = {
    'register': common_interface.register_interface,
    'login': common_interface.login_interface,
    'check_movie': common_interface.check_movie_interface,
    'upload_movie': admin_interface.upload_movie_interface,
    'get_movie_list': common_interface.get_movie_list_interface,
    'delete_movie': admin_interface.delete_movie_interface,
    'send_notice': admin_interface.send_notice_interface,
    'pay_vip': user_interface.pay_vip_interface,
    'download_movie': user_interface.download_movie_interface,
    'check_record': user_interface.check_record_interface,
    'check_notice': user_interface.check_notice_interface
}


def run():
    print('启动服务端')
    session_data.mutex = Lock()

    server = socket.socket()
    server.bind(
        (settings.ip, settings.port)
    )
    server.listen(5)
    while True:
        conn, addr = server.accept()
        print(addr)

        pool.submit(working, conn, addr)

def dispatcher(back_dic, conn):
    type = back_dic.get('type')

    if type in func_dic:
        func_dic.get(type)(back_dic, conn)


def working(conn, addr):
    while True:
        try:
            headers = conn.recv(4)
            data_len = struct.unpack('i', headers)[0]
            json_data = conn.recv(data_len).decode('utf-8')

            back_dic = json.loads(json_data)
            back_dic['addr'] = str(addr)
            dispatcher(back_dic, conn)

        except Exception as e:
            print(e)
            session_data.mutex.acquire()
            session_data.session_dict.pop(str(addr))
            session_data.mutex.release()
            conn.close()
            break

START

from socket_server import tcp_server

import os
import sys

sys.path.append(
    os.path.dirname(__file__)
)

if __name__ == '__main__':
    tcp_server.run()
    
上一篇:【C#】Excel舍入函数Round、RoundUp、RoundDown的C#版


下一篇:scrapy把数据保存到mongodb