启动文件
run.py
from core import src
if __name__ == '__main__':
src.main()
用户视图层
src.py
# 用户视图层
from interface import user_interface
from interface import bank_interface
from interface import shopping_interface
from lib import common
# 记录用户登录状态
login_user = None
# 1、注册功能
@common.logger('注册')
def register():
while True:
inp_user = input("请输入用户名:").strip()
inp_pwd = input("请输入密码:").strip()
inp_pwd2 = input("请确认密码:").strip()
if not (inp_user and inp_pwd and inp_pwd2):
print("输入不能为空")
continue
if inp_pwd != inp_pwd2:
print("两次密码不一致")
continue
flag, msg = user_interface.register_interface(inp_user, inp_pwd)
if flag:
print(msg)
break
else:
print(msg)
# 2、登录功能
@common.logger('登录')
def login():
while True:
inp_user = input("请输入用户名:").strip()
inp_pwd = input("请输入密码:").strip()
if not (inp_user and inp_pwd):
print("输入不能为空")
continue
flog, msg = user_interface.login_interface(inp_user, inp_pwd)
if flog:
global login_user
login_user = inp_user
print(f'{msg},欢迎您[{login_user}]')
break
else:
print(msg)
# 3、查看余额
@common.logger('查看余额')
def check_balance():
bal = bank_interface.check_balance_interface(login_user)
print(f'用户[{login_user}],您的账户余额为:{bal}元')
print("================== END ==================")
# 4、提现功能
@common.logger('提现')
def withdraw():
inp_money = input("请输入提现金额【会自动扣除5%手续费】:").strip()
if not inp_money.isdigit():
print("提现失败,输入错误")
return
flog, msg = bank_interface.withdraw_interface(login_user, float(inp_money))
if flog:
print(msg)
else:
print(msg)
# 5、还款功能
@common.logger('还款')
def repayment():
inp_money = input("请输入还款金额:").strip()
if not inp_money.isdigit():
print("还款失败,输入错误")
return
flog, msg = bank_interface.repayment_interface(login_user, float(inp_money))
if flog:
print(msg)
# 6、转账功能
@common.logger('转账')
def transfer():
other_user = input("对方用户名:").strip()
inp_money = input("请输入转账金额:").strip()
if not inp_money.isdigit():
print("转账失败,输入错误")
return
flog, msg = bank_interface.transfer_interface(login_user, other_user, float(inp_money))
if flog:
print(msg)
else:
print(f'转账失败,{msg}')
# 7、查看流水
@common.logger('查看流水')
def check_ledger():
print("============= 用户日常消费账单 =============")
ledger_list = user_interface.check_ledger_interface(login_user)
for ledger in ledger_list:
print(ledger)
print("================== END ==================")
# ATM功能接口调用函数
@common.is_login
@common.logger('ATM')
def atm_funcs():
atm_func = '''
============ 欢迎使用 ATM 程序 ============
| 0 查询余额 |
| 1 提现 |
| 2 还款 |
| 3 转账 |
| 4 查看流水 |
================== END ===================
'''
while True:
print(atm_func)
atm_choice = input("请输入功能编号:【退出:q】").strip()
if atm_choice.lower() == 'q':
break
if atm_choice not in atm_dic:
print("输入错误,编号不存在")
continue
atm_dic.get(atm_choice)()
# 8、购物功能
@common.logger('购物')
def shopping():
while True:
# 获取全部商品信息并打印
shops_dic = shopping_interface.get_shops_interface()
print("============== 欢迎光临茸茸蛋糕店 ==============")
for id, shop_dic in shops_dic.items():
print(f'| 编号{id} 商品名 {shop_dic.get("shop_name")} 价格 {shop_dic.get("price")}元')
print("==================== END ====================")
shop_id = input("请输入要购买的商品编号(返回上一层:q):").strip()
if shop_id.lower() == 'q':
break
count = input("请输入购买的数量:").strip()
if not count.isdigit() or count == '0':
print("输入错误")
continue
if shop_id not in shops_dic:
print('商品不存在')
continue
# 将对应编号的商品加入购物车
shopping_interface.add_shop_car_interface(shops_dic[shop_id], int(count))
print(f"{shops_dic[shop_id]['shop_name']}已加入购物车,请到购物车结算")
# 9、查看购物车
@common.logger('查看购物车')
def check_shop_car():
shop_car_dic = shopping_interface.check_shop_car_interface(login_user)
while True:
shop_id = 0
print("================= 购物车 =================")
for shop_name, shop_info in shop_car_dic.items():
print(f'| 编号【{shop_id}】 商品名 {shop_name} 数量 {shop_info.get("count")}个 总价 {shop_info.get("total")}元')
shop_id += 1
print("================== END ==================")
pay_id = input("请选择需要付款的商品编号(离开购物车:q):").strip()
if pay_id.lower() == 'q':
break
if int(pay_id) not in range(len(shop_car_dic)):
print("商品不存在")
continue
# 获取选择付款商品的商品信息并打印
buy_shop_name = list(shop_car_dic.keys())[int(pay_id)]
buy_shop_count = shop_car_dic[buy_shop_name].get('count')
buy_total = shop_car_dic[buy_shop_name].get('total')
print(f"您将付款的商品为:{buy_shop_name},数量为:{buy_shop_count}个, 总价为:{buy_total}元")
# 付款
inp_buy = input("确认付款(Y/n):").strip()
if inp_buy.lower() == 'y':
flog, msg = shopping_interface.pay_for_shop_interface(login_user, buy_shop_name, buy_total)
if flog:
print(msg)
break
else:
print('msg')
print("付款失败,用户取消支付")
# 购物商城接口调用函数
@common.is_login
@common.logger('购物商城')
def shop_funcs():
shop_func = '''
============ 欢迎来到购物商城 ==============
| 0 购物 |
| 1 查看购物车 |
| 2 查看流水 |
================== END ====================
'''
while True:
print(shop_func)
shop_choice = input("请输入功能编号:【退出:q】").strip()
if shop_choice.lower() == 'q':
break
if shop_choice not in shop_dic:
print("输入错误,编号不存在")
continue
shop_dic.get(shop_choice)()
# 管理员功能一:增加用户额度
def up_user_balance():
user_list = user_interface.admin_check_all_user()
for user in user_list:
user_info = user_interface.admin_get_user_info(user)
print(f'用户名:{user_info[0]} 用户余额:{user_info[1]} 用户锁定状态(locked): {user_info[2]}')
inp_name = input("请输入要修改的用户名:").strip()
if inp_name not in user_list:
print("修改失败,用户名错误")
return
inp_balance = input("请输入要增加的用户额度:").strip()
# 管理员功能二:锁定用户
def locked_user():
pass
# 10、管理员功能
def admin():
inp_name = input("请输入管理员账号:").strip()
inp_pwd = input("请输入密码:").strip()
if not (inp_name and inp_pwd):
print('输入不能为空')
flog, msg = user_interface.admin_interface(inp_name, inp_pwd)
if flog:
print(msg)
global login_user
login_user = inp_name
print(f'欢迎,尊敬的{login_user}大人(^・ω・^§)ノ')
while True:
admin_func = f'''
========= 只给亲爱的{login_user}大人使用的特殊功能哟 =========
1 增加用户额度
2 锁定用户
0 退出登录
'''
print(admin_func)
# 管理员功能字典
admin_func_dic = {
'0': None,
'1': up_user_balance,
'2': locked_user
}
func_id = input(f"请{login_user}大人选择指定功能编号:").strip()
if func_id not in admin_func_dic:
print(f"[编号错误] 即便是尊敬的{login_user}大人也会犯错呢(^・ω・^§)ノ")
continue
if func_id == '0':
print("已经要走了吗,真舍不得呢,那么下次再见咯!啾(^・ω・^§)ノ")
login_user = None
break
admin_func_dic[func_id]()
else:
print(msg)
# 功能字典
# atm功能字典
atm_dic = {
'0': check_balance,
'1': withdraw,
'2': repayment,
'3': transfer,
'4': check_ledger
}
# 购物车功能字典
shop_dic = {
'0': shopping,
'1': check_shop_car,
'2': check_ledger
}
# 用户认证功能字典
user_dic = {
'0': register,
'1': login,
'2': atm_funcs,
'3': shop_funcs,
'4': admin
}
def main():
while True:
msg = '''
========== 欢迎来到ATM+商城购物 ==========
0 注册
1 登录
2 ATM
3 商城购物
4 管理员
================== END ==================
'''
print(msg)
func_num = input("请输入功能编号【退出:q】:").strip()
if func_num.lower() == 'q':
print("欢迎下次光临 (^_^)")
break
if func_num in user_dic:
user_dic.get(func_num)()
continue
print("输入错误,编号不存在")
逻辑接口层
user_interface.py
# 用户登陆核心逻辑接口
from db import db_handle
from lib import common
# 用户注册接口
def register_interface(username, password, balance=15000):
password = common.get_pwd_md5(password)
user_dic = {
'username': username,
'password': password,
'balance': float(balance),
'flow': [],
'shop_car': {},
'locked': False
}
if db_handle.select(username):
return False, '用户已存在'
db_handle.save(user_dic)
return True, '用户注册成功'
# 用户登录接口
def login_interface(username, password):
user_dic = db_handle.select(username)
if not user_dic:
return False, '用户不存在'
if user_dic.get('locked'):
return False, '用户已被锁定,请联系管理员解锁'
password = common.get_pwd_md5(password)
if password == user_dic.get('password'):
return True, '登录成功'
return False, '密码错误'
# 查看流水接口
def check_ledger_interface(login_user):
user_dic = db_handle.select(login_user)
flow_list = user_dic["flow"]
return flow_list
# 管理员接口
def admin_interface(username, password):
password = db_handle.admin_md5(password)
if username == 'admin' and password == '84ca85ffb92f710f441ca0212cad819d':
return True, '登陆成功'
return False, '登陆失败'
# 管理员获取所有用户名
def admin_check_all_user():
user_list = db_handle.admin_get_all_user()
return user_list
# 管理员获取所有用户信息
def admin_get_user_info(user):
user_dic = db_handle.select(user)
user_info = [user_dic.get('username'), user_dic.get('balance'), user_dic.get('locked')]
return user_info
# 购物车核心逻辑接口
import time
from db import db_handle
# 获取商品信息接口
def get_shops_interface():
shops_dic = db_handle.select_shops()
return shops_dic
# 添加商品进入购物车接口
def add_shop_car_interface(shop_dic, count):
from core import src
total = shop_dic["price"]*count
shop_name = shop_dic['shop_name']
shop_dic = {
f"{shop_name}": {"count": count, "total": total}
}
user_dic = db_handle.select(src.login_user)
# 用户购物车内商品信息
shop_car_dic = user_dic['shop_car']
# 如果购物车为空则直接加入
if user_dic.get('shop_car') == {}:
user_dic['shop_car'] = shop_dic
else:
# 如果商品不在购物车中则直接加入
if shop_name not in shop_car_dic:
shop_car_dic.update(shop_dic)
user_dic['shop_car'] = shop_car_dic
else:
# 商品在购物车中,修改商品数量以及总价
user_dic['shop_car'].get(shop_name)['count'] += count
user_dic['shop_car'].get(shop_name)['total'] += total
db_handle.save(user_dic)
# 获取购物车信息接口
def check_shop_car_interface(login_user):
user_dic = db_handle.select(login_user)
shop_car_dic = user_dic["shop_car"]
return shop_car_dic
# 付款接口
def pay_for_shop_interface(login_user, buy_shop_name, buy_total):
start_time = time.strftime('%Y-%m-%d %H:%M:%S')
user_dic = db_handle.select(login_user)
# 获取用户购物车字典
shop_car_dic = user_dic['shop_car']
# 获取用户余额
balance = user_dic['balance']
if buy_total > balance:
return False, '支付失败,余额不足'
balance -= buy_total
shop_car_dic.pop(buy_shop_name)
# 修改用户余额
user_dic['balance'] = balance
# 删除已付款商品
user_dic['shop_car'] = shop_car_dic
# 增加流水
user_dic['flow'].append(f'{start_time} 用户[{login_user}] 购买商品{buy_shop_name} 消费{buy_total}元')
# 更新用户数据
db_handle.save(user_dic)
return True, '支付成功'
bank_interface.py
# ATM核心逻辑接口
import time
from db import db_handle
# 查询余额接口
def check_balance_interface(login_user):
user_dic = db_handle.select(login_user)
return user_dic["balance"]
# 提现接口
def withdraw_interface(login_user, inp_money):
start_time = time.strftime('%Y-%m-%d %H:%M:%S')
user_dic = db_handle.select(login_user)
# 提现金额 + 手续费5%
money = inp_money * 1.05
if inp_money > user_dic.get("balance"):
return False, '余额不足'
user_dic["balance"] -= money
user_dic["flow"].append(f'{start_time} 用户[{login_user} 提现{inp_money}元]')
# 更新用户数据
db_handle.save(user_dic)
return True, '提现成功'
# 还款接口
def repayment_interface(login_user, inp_money):
start_time = time.strftime('%Y-%m-%d %H:%M:%S')
# 获取用户数据
user_dic = db_handle.select(login_user)
user_dic["balance"] += inp_money
user_dic["flow"].append(f'{start_time} 用户[{login_user}] 还款{inp_money}元')
# 修改用户数据
db_handle.save(user_dic)
return True, '还款成功'
# 转账接口
def transfer_interface(login_user, other_user, inp_money):
start_time = time.strftime('%Y-%m-%d %H:%M:%S')
user_dic = db_handle.select(login_user)
other_dic = db_handle.select(other_user)
if not other_dic:
return False, '对方用户不存在'
if inp_money > user_dic.get("balance"):
return False, '余额不足'
other_dic["balance"] += inp_money
user_dic["balance"] -= inp_money
user_dic["flow"].append(f'{start_time} 用户[{login_user} 转账{inp_money}元]')
db_handle.save(other_dic)
db_handle.save(user_dic)
return True, '转账成功'
数据处理层
db_handle.py
import os
import json
from conf import settings
def select(username):
user_json_path = os.path.join(settings.DB_PATH, 'user_data', f'{username}.json')
if os.path.exists(user_json_path):
with open(user_json_path, 'rt', encoding='utf-8') as f:
user_dic = json.load(f)
return user_dic
def save(user_dic):
username = user_dic.get('username')
user_json_path = os.path.join(settings.DB_PATH, 'user_data', f'{username}.json')
with open(user_json_path, 'w', encoding='utf-8') as f:
json.dump(user_dic, f)
def select_shops():
shops_path = os.path.join(settings.DB_PATH, 'shop_class_info.json')
with open(shops_path, 'rt', encoding='utf-8') as f:
shops_dic = json.load(f)
return shops_dic
# 管理员密码加密
def admin_md5(inp_info):
import hashlib
u_md = hashlib.md5()
u_md.update("Umi".encode('utf-8'))
u_md.update(inp_info.encode('utf-8'))
u_md.update("管理员".encode('utf-8'))
inp_info = u_md.hexdigest()
return inp_info
def admin_get_all_user():
log_path = os.path.join(settings.BASE_PATH, 'log', 'user.log')
user_list = []
with open(log_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip().split()
if line[2] not in user_list and line[2] != '游客':
user_list.append(line[2])
return user_list
common .py
# 密码加密
def get_pwd_md5(password):
import hashlib
md5 = hashlib.md5()
md5.update('憨批'.encode('utf-8'))
md5.update(password.encode('utf-8'))
md5.update('憨批'.encode('utf-8'))
password = md5.hexdigest()
return password
# 登录认证装饰器
def is_login(func):
from core import src
def wrapper(*args, **kwargs):
if src.login_user is None:
print("登录后才可使用该功能")
src.login()
func(*args, **kwargs)
return wrapper
# 日志记录功能的装饰器
def logger(func_type):
from conf import settings
from core import src
from logging import config
from logging import getLogger
config.dictConfig(settings.LOGGING_DIC)
def inner(func):
def wrapper(*args, **kwargs):
if src.login_user is None:
src.login_user = '游客'
logger = getLogger(src.login_user)
logger.info(f'使用了{func_type}功能')
func(*args, **kwargs)
return wrapper
return inner
settings.py
import os
# 项目根路径
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
# db
DB_PATH = os.path.join(BASE_PATH, 'db')
standard_format = '[%(asctime)s] %(name)s [%(levelname)s][%(message)s]'
simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
test_format = '%(asctime)s %(name)s %(message)s'
# 日志配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
'test': {
'format': test_format
},
},
'filters': {},
'handlers': {
#打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'standard'
},
#打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件,日志轮转
'formatter': 'test',
# 可以定制日志文件路径
# BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # log文件的目录
# LOG_PATH = os.path.join(BASE_DIR,'a1.log')
'filename': os.path.join(BASE_PATH, 'log', 'user.log'), # 日志文件
'maxBytes': 1024*1024*5, # 日志大小 5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', ], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG', # loggers(第一层日志级别关限制)--->handlers(第二层日志级别关卡限制)
'propagate': False, # 默认为True,向上(更高level的logger)传递,通常设置为False即可,否则会一份日志向上层层传递
},
},
}