ATM+购物车(部分代码——三层框架)

启动文件

run.py

from core import src

if __name__ == '__main__':
    src.main()

用户视图层

src.py

# 用户视图层
from interface import user_interface
from interface import bank_interface
from interface import shopping_interface
from lib import common


# 记录用户登录状态
login_user = None

# 1、注册功能
@common.logger('注册')
def register():
    while True:
        inp_user = input("请输入用户名:").strip()
        inp_pwd = input("请输入密码:").strip()
        inp_pwd2 = input("请确认密码:").strip()
        if not (inp_user and inp_pwd and inp_pwd2):
            print("输入不能为空")
            continue
        if inp_pwd != inp_pwd2:
            print("两次密码不一致")
            continue
        flag, msg = user_interface.register_interface(inp_user, inp_pwd)
        if flag:
            print(msg)
            break
        else:
            print(msg)


# 2、登录功能
@common.logger('登录')
def login():
    while True:
        inp_user = input("请输入用户名:").strip()
        inp_pwd = input("请输入密码:").strip()
        if not (inp_user and inp_pwd):
            print("输入不能为空")
            continue
        flog, msg = user_interface.login_interface(inp_user, inp_pwd)
        if flog:
            global login_user
            login_user = inp_user
            print(f'{msg},欢迎您[{login_user}]')
            break
        else:
            print(msg)


# 3、查看余额
@common.logger('查看余额')
def check_balance():
    bal = bank_interface.check_balance_interface(login_user)
    print(f'用户[{login_user}],您的账户余额为:{bal}元')
    print("==================  END  ==================")


# 4、提现功能
@common.logger('提现')
def withdraw():
    inp_money = input("请输入提现金额【会自动扣除5%手续费】:").strip()
    if not inp_money.isdigit():
        print("提现失败,输入错误")
        return
    flog, msg = bank_interface.withdraw_interface(login_user, float(inp_money))
    if flog:
        print(msg)
    else:
        print(msg)


# 5、还款功能
@common.logger('还款')
def repayment():
    inp_money = input("请输入还款金额:").strip()
    if not inp_money.isdigit():
        print("还款失败,输入错误")
        return
    flog, msg = bank_interface.repayment_interface(login_user, float(inp_money))
    if flog:
        print(msg)


# 6、转账功能
@common.logger('转账')
def transfer():
    other_user = input("对方用户名:").strip()
    inp_money = input("请输入转账金额:").strip()
    if not inp_money.isdigit():
        print("转账失败,输入错误")
        return
    flog, msg = bank_interface.transfer_interface(login_user, other_user, float(inp_money))
    if flog:
        print(msg)
    else:
        print(f'转账失败,{msg}')


# 7、查看流水
@common.logger('查看流水')
def check_ledger():
    print("============= 用户日常消费账单 =============")
    ledger_list = user_interface.check_ledger_interface(login_user)
    for ledger in ledger_list:
        print(ledger)
    print("==================  END  ==================")


# ATM功能接口调用函数
@common.is_login
@common.logger('ATM')
def atm_funcs():
    atm_func = '''
    ============  欢迎使用 ATM 程序  ============
    |               0 查询余额                  |
    |               1 提现                      |
    |               2 还款                      |
    |               3 转账                      |
    |               4 查看流水                  |
    ==================  END  ===================
    '''
    while True:
        print(atm_func)
        atm_choice = input("请输入功能编号:【退出:q】").strip()
        if atm_choice.lower() == 'q':
            break
        if atm_choice not in atm_dic:
            print("输入错误,编号不存在")
            continue
        atm_dic.get(atm_choice)()


# 8、购物功能
@common.logger('购物')
def shopping():
    while True:
        # 获取全部商品信息并打印
        shops_dic = shopping_interface.get_shops_interface()
        print("============== 欢迎光临茸茸蛋糕店 ==============")
        for id, shop_dic in shops_dic.items():
            print(f'| 编号{id} 商品名 {shop_dic.get("shop_name")} 价格 {shop_dic.get("price")}元')
        print("====================  END  ====================")
        shop_id = input("请输入要购买的商品编号(返回上一层:q):").strip()
        if shop_id.lower() == 'q':
            break
        count = input("请输入购买的数量:").strip()
        if not count.isdigit() or count == '0':
            print("输入错误")
            continue
        if shop_id not in shops_dic:
            print('商品不存在')
            continue
        # 将对应编号的商品加入购物车
        shopping_interface.add_shop_car_interface(shops_dic[shop_id], int(count))
        print(f"{shops_dic[shop_id]['shop_name']}已加入购物车,请到购物车结算")


# 9、查看购物车
@common.logger('查看购物车')
def check_shop_car():
    shop_car_dic = shopping_interface.check_shop_car_interface(login_user)
    while True:
        shop_id = 0
        print("=================  购物车  =================")
        for shop_name, shop_info in shop_car_dic.items():
            print(f'| 编号【{shop_id}】 商品名 {shop_name} 数量 {shop_info.get("count")}个 总价 {shop_info.get("total")}元')
            shop_id += 1
        print("==================  END  ==================")
        pay_id = input("请选择需要付款的商品编号(离开购物车:q):").strip()
        if pay_id.lower() == 'q':
            break
        if int(pay_id) not in range(len(shop_car_dic)):
            print("商品不存在")
            continue
        # 获取选择付款商品的商品信息并打印
        buy_shop_name = list(shop_car_dic.keys())[int(pay_id)]
        buy_shop_count = shop_car_dic[buy_shop_name].get('count')
        buy_total = shop_car_dic[buy_shop_name].get('total')
        print(f"您将付款的商品为:{buy_shop_name},数量为:{buy_shop_count}个, 总价为:{buy_total}元")
        # 付款
        inp_buy = input("确认付款(Y/n):").strip()
        if inp_buy.lower() == 'y':
            flog, msg = shopping_interface.pay_for_shop_interface(login_user, buy_shop_name, buy_total)
            if flog:
                print(msg)
                break
            else:
                print('msg')
        print("付款失败,用户取消支付")


# 购物商城接口调用函数
@common.is_login
@common.logger('购物商城')
def shop_funcs():
    shop_func = '''
    ============  欢迎来到购物商城  ==============
    |                0 购物                      |
    |                1 查看购物车                |
    |                2 查看流水                  |
    ==================  END  ====================
    '''
    while True:
        print(shop_func)
        shop_choice = input("请输入功能编号:【退出:q】").strip()
        if shop_choice.lower() == 'q':
            break
        if shop_choice not in shop_dic:
            print("输入错误,编号不存在")
            continue
        shop_dic.get(shop_choice)()


# 管理员功能一:增加用户额度
def up_user_balance():
    user_list = user_interface.admin_check_all_user()
    for user in user_list:
        user_info = user_interface.admin_get_user_info(user)
        print(f'用户名:{user_info[0]} 用户余额:{user_info[1]} 用户锁定状态(locked): {user_info[2]}')
    inp_name = input("请输入要修改的用户名:").strip()
    if inp_name not in user_list:
        print("修改失败,用户名错误")
        return
    inp_balance = input("请输入要增加的用户额度:").strip()
    



# 管理员功能二:锁定用户
def locked_user():
    pass


# 10、管理员功能
def admin():
    inp_name = input("请输入管理员账号:").strip()
    inp_pwd = input("请输入密码:").strip()
    if not (inp_name and inp_pwd):
        print('输入不能为空')
    flog, msg = user_interface.admin_interface(inp_name, inp_pwd)
    if flog:
        print(msg)
        global login_user
        login_user = inp_name
        print(f'欢迎,尊敬的{login_user}大人(^・ω・^§)ノ')
        while True:
            admin_func = f'''
            ========= 只给亲爱的{login_user}大人使用的特殊功能哟 =========
                                1 增加用户额度
                                2 锁定用户
                                0 退出登录
            '''
            print(admin_func)
            # 管理员功能字典
            admin_func_dic = {
                '0': None,
                '1': up_user_balance,
                '2': locked_user
            }
            func_id = input(f"请{login_user}大人选择指定功能编号:").strip()
            if func_id not in admin_func_dic:
                print(f"[编号错误] 即便是尊敬的{login_user}大人也会犯错呢(^・ω・^§)ノ")
                continue
            if func_id == '0':
                print("已经要走了吗,真舍不得呢,那么下次再见咯!啾(^・ω・^§)ノ")
                login_user = None
                break
            admin_func_dic[func_id]()
    else:
        print(msg)


# 功能字典
# atm功能字典
atm_dic = {
        '0': check_balance,
        '1': withdraw,
        '2': repayment,
        '3': transfer,
        '4': check_ledger
    }

# 购物车功能字典
shop_dic = {
        '0': shopping,
        '1': check_shop_car,
        '2': check_ledger
    }

# 用户认证功能字典
user_dic = {
    '0': register,
    '1': login,
    '2': atm_funcs,
    '3': shop_funcs,
    '4': admin
}


def main():
    while True:
        msg = '''
            ==========  欢迎来到ATM+商城购物  ==========
                            0 注册
                            1 登录
                            2 ATM
                            3 商城购物
                            4 管理员
            ==================  END  ==================
            '''
        print(msg)
        func_num = input("请输入功能编号【退出:q】:").strip()
        if func_num.lower() == 'q':
            print("欢迎下次光临 (^_^)")
            break
        if func_num in user_dic:
            user_dic.get(func_num)()
            continue
        print("输入错误,编号不存在")

逻辑接口层

user_interface.py

# 用户登陆核心逻辑接口
from db import db_handle
from lib import common


# 用户注册接口
def register_interface(username, password, balance=15000):
    password = common.get_pwd_md5(password)
    user_dic = {
        'username': username,
        'password': password,
        'balance': float(balance),
        'flow': [],
        'shop_car': {},
        'locked': False
    }
    if db_handle.select(username):
        return False, '用户已存在'
    db_handle.save(user_dic)
    return True, '用户注册成功'


# 用户登录接口
def login_interface(username, password):
    user_dic = db_handle.select(username)
    if not user_dic:
        return False, '用户不存在'
    if user_dic.get('locked'):
        return False, '用户已被锁定,请联系管理员解锁'
    password = common.get_pwd_md5(password)
    if password == user_dic.get('password'):
        return True, '登录成功'
    return False, '密码错误'


# 查看流水接口
def check_ledger_interface(login_user):
    user_dic = db_handle.select(login_user)
    flow_list = user_dic["flow"]
    return flow_list


# 管理员接口
def admin_interface(username, password):
    password = db_handle.admin_md5(password)
    if username == 'admin' and password == '84ca85ffb92f710f441ca0212cad819d':
        return True, '登陆成功'
    return False, '登陆失败'


# 管理员获取所有用户名
def admin_check_all_user():
    user_list = db_handle.admin_get_all_user()
    return user_list


# 管理员获取所有用户信息
def admin_get_user_info(user):
    user_dic = db_handle.select(user)
    user_info = [user_dic.get('username'), user_dic.get('balance'), user_dic.get('locked')]
    return user_info

shopping_interface.py

# 购物车核心逻辑接口
import time
from db import db_handle


# 获取商品信息接口
def get_shops_interface():
    shops_dic = db_handle.select_shops()
    return shops_dic


# 添加商品进入购物车接口
def add_shop_car_interface(shop_dic, count):
    from core import src
    total = shop_dic["price"]*count
    shop_name = shop_dic['shop_name']
    shop_dic = {
        f"{shop_name}": {"count": count, "total": total}
    }
    user_dic = db_handle.select(src.login_user)
    # 用户购物车内商品信息
    shop_car_dic = user_dic['shop_car']
    # 如果购物车为空则直接加入
    if user_dic.get('shop_car') == {}:
        user_dic['shop_car'] = shop_dic
    else:
        # 如果商品不在购物车中则直接加入
        if shop_name not in shop_car_dic:
            shop_car_dic.update(shop_dic)
            user_dic['shop_car'] = shop_car_dic
        else:
            # 商品在购物车中,修改商品数量以及总价
            user_dic['shop_car'].get(shop_name)['count'] += count
            user_dic['shop_car'].get(shop_name)['total'] += total
    db_handle.save(user_dic)


# 获取购物车信息接口
def check_shop_car_interface(login_user):
    user_dic = db_handle.select(login_user)
    shop_car_dic = user_dic["shop_car"]
    return shop_car_dic


# 付款接口
def pay_for_shop_interface(login_user, buy_shop_name, buy_total):
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    user_dic = db_handle.select(login_user)
    # 获取用户购物车字典
    shop_car_dic = user_dic['shop_car']
    # 获取用户余额
    balance = user_dic['balance']
    if buy_total > balance:
        return False, '支付失败,余额不足'
    balance -= buy_total
    shop_car_dic.pop(buy_shop_name)
    # 修改用户余额
    user_dic['balance'] = balance
    # 删除已付款商品
    user_dic['shop_car'] = shop_car_dic
    # 增加流水
    user_dic['flow'].append(f'{start_time} 用户[{login_user}] 购买商品{buy_shop_name} 消费{buy_total}元')
    # 更新用户数据
    db_handle.save(user_dic)
    return True, '支付成功'

bank_interface.py

# ATM核心逻辑接口
import time
from db import db_handle


# 查询余额接口
def check_balance_interface(login_user):
    user_dic = db_handle.select(login_user)
    return user_dic["balance"]


# 提现接口
def withdraw_interface(login_user, inp_money):
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    user_dic = db_handle.select(login_user)
    # 提现金额 + 手续费5%
    money = inp_money * 1.05
    if inp_money > user_dic.get("balance"):
        return False, '余额不足'
    user_dic["balance"] -= money
    user_dic["flow"].append(f'{start_time} 用户[{login_user} 提现{inp_money}元]')
    # 更新用户数据
    db_handle.save(user_dic)
    return True, '提现成功'


# 还款接口
def repayment_interface(login_user, inp_money):
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    # 获取用户数据
    user_dic = db_handle.select(login_user)
    user_dic["balance"] += inp_money
    user_dic["flow"].append(f'{start_time} 用户[{login_user}] 还款{inp_money}元')
    # 修改用户数据
    db_handle.save(user_dic)
    return True, '还款成功'


# 转账接口
def transfer_interface(login_user, other_user, inp_money):
    start_time = time.strftime('%Y-%m-%d %H:%M:%S')
    user_dic = db_handle.select(login_user)
    other_dic = db_handle.select(other_user)
    if not other_dic:
        return False, '对方用户不存在'
    if inp_money > user_dic.get("balance"):
        return False, '余额不足'
    other_dic["balance"] += inp_money
    user_dic["balance"] -= inp_money
    user_dic["flow"].append(f'{start_time} 用户[{login_user} 转账{inp_money}元]')
    db_handle.save(other_dic)
    db_handle.save(user_dic)
    return True, '转账成功'


数据处理层

db_handle.py

import os
import json
from conf import settings


def select(username):
    user_json_path = os.path.join(settings.DB_PATH, 'user_data', f'{username}.json')
    if os.path.exists(user_json_path):
        with open(user_json_path, 'rt', encoding='utf-8') as f:
            user_dic = json.load(f)
            return user_dic


def save(user_dic):
    username = user_dic.get('username')
    user_json_path = os.path.join(settings.DB_PATH, 'user_data', f'{username}.json')
    with open(user_json_path, 'w', encoding='utf-8') as f:
        json.dump(user_dic, f)


def select_shops():
    shops_path = os.path.join(settings.DB_PATH, 'shop_class_info.json')
    with open(shops_path, 'rt', encoding='utf-8') as f:
        shops_dic = json.load(f)
        return shops_dic


# 管理员密码加密
def admin_md5(inp_info):
    import hashlib
    u_md = hashlib.md5()
    u_md.update("Umi".encode('utf-8'))
    u_md.update(inp_info.encode('utf-8'))
    u_md.update("管理员".encode('utf-8'))
    inp_info = u_md.hexdigest()
    return inp_info


def admin_get_all_user():
    log_path = os.path.join(settings.BASE_PATH, 'log', 'user.log')
    user_list = []
    with open(log_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip().split()
            if line[2] not in user_list and line[2] != '游客':
                user_list.append(line[2])
        return user_list

common .py

# 密码加密
def get_pwd_md5(password):
    import hashlib
    md5 = hashlib.md5()
    md5.update('憨批'.encode('utf-8'))
    md5.update(password.encode('utf-8'))
    md5.update('憨批'.encode('utf-8'))
    password = md5.hexdigest()
    return password


# 登录认证装饰器
def is_login(func):
    from core import src

    def wrapper(*args, **kwargs):
        if src.login_user is None:
            print("登录后才可使用该功能")
            src.login()
        func(*args, **kwargs)
    return wrapper


# 日志记录功能的装饰器
def logger(func_type):
    from conf import settings
    from core import src
    from logging import config
    from logging import getLogger
    config.dictConfig(settings.LOGGING_DIC)
    def inner(func):
        def wrapper(*args, **kwargs):
            if src.login_user is None:
                src.login_user = '游客'
            logger = getLogger(src.login_user)
            logger.info(f'使用了{func_type}功能')
            func(*args, **kwargs)
        return wrapper
    return inner

settings.py

import os
# 项目根路径
BASE_PATH = os.path.dirname(os.path.dirname(__file__))

# db
DB_PATH = os.path.join(BASE_PATH, 'db')


standard_format = '[%(asctime)s] %(name)s [%(levelname)s][%(message)s]'

simple_format = '[%(levelname)s][%(asctime)s] %(message)s'

test_format = '%(asctime)s %(name)s %(message)s'

# 日志配置字典
LOGGING_DIC = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'standard': {
            'format': standard_format
        },
        'simple': {
            'format': simple_format
        },
        'test': {
            'format': test_format
        },
    },
    'filters': {},
    'handlers': {
        #打印到终端的日志
        'console': {
            'level': 'DEBUG',
            'class': 'logging.StreamHandler',  # 打印到屏幕
            'formatter': 'standard'
        },
        #打印到文件的日志,收集info及以上的日志
        'default': {
            'level': 'DEBUG',
            'class': 'logging.handlers.RotatingFileHandler',  # 保存到文件,日志轮转
            'formatter': 'test',
            # 可以定制日志文件路径
            # BASE_DIR = os.path.dirname(os.path.abspath(__file__))  # log文件的目录
            # LOG_PATH = os.path.join(BASE_DIR,'a1.log')
            'filename': os.path.join(BASE_PATH, 'log', 'user.log'),  # 日志文件
            'maxBytes': 1024*1024*5,  # 日志大小 5M
            'backupCount': 5,
            'encoding': 'utf-8',  # 日志文件的编码,再也不用担心中文log乱码了
        },
    },
    'loggers': {
        # logging.getLogger(__name__)拿到的logger配置
        '': {
            'handlers': ['default', ],  # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
            'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
            'propagate': False,  # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
        },
    },
}
上一篇:python基础05——深浅copy&while循环


下一篇:POI如何读取一个excel文件中的内容去修改另一个excel文件中的数据