- 三层架构
- 用户视图层
- 用于与用户进行交互
- 接收用户输入的内容
- 打印输出内容给用户
- 简单的逻辑处理
- 逻辑接口层
- 核心业务逻辑,相当于用户视图与数据处理层的桥梁
- 接收视图层传递过来的参数进行逻辑处理
- 返回结果给视图层
- 数据处理层
- 做数据的
- 增
- 删
- 查
- 改
- ATM 项目根目录
- readme.md 项目的说明书
- start.py 项目启动文件
import sys
import os
from core import src
sys.path.append(os.path.dirname(__file__))
if __name__ == '__main__':
src.run()
- conf 配置文件
- settings.py
import os
BASE_PATH=os.path.dirname(os.path.dirname(__file__))
USER_DATA_PATH=os.path.join(BASE_PATH,'db','user_data')
"""
logging配置
"""
# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
# 定义日志输出格式 结束
# ****************注意1: log文件的目录
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
logfile_dir = os.path.join(BASE_PATH, 'log')
# print(logfile_dir)
# ****************注意2: log文件名
logfile_name = 'atm.log'
# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir)
# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard',
'filename': logfile_path, # 日志文件
'maxBytes': 1024 * 1024 * 5, # 日志大小 5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': True, # 向上(更高level的logger)传递
},
},
}
- lib 公共方法文件
- common.py
import hashlib
from core import src
import logging.config
from conf import settings
def pwd5(password):
pwd_5=hashlib.md5
pwd_5.update(password.encode('utf-8'))
salt='乳酸菌,喝了这一瓶,还有下一瓶'
pwd_5.update(salt.encode('utf-8'))
return pwd_5.hexdigest()
def auth(fiunc):
def inner(*args,**kwargs):
if src.Login_user:
res=auth(*args,**kwargs)
return res
else:
print('您目前尚未登录,无法操作该功能!')
src.login()
return inner
def logger(type):
logging.config.dictConfig(settings.LOGGING_DIC)
logger=logging.getLogger(type)
return logger
- core(用户视图层) 存放用户视图层代码文件
- src.py
from interface import user_interface
from lib import common
from interface import shop_interface
from interface import bank_interface
Login_user=None
def register():
while True:
username=input('请输入用户名称:').strip()
passwoed=input('请输入用户密码:').strip()
r_passwoed=input('请输入用户密码:').strip()
if passwoed == r_passwoed:
print('恭喜您,注册成功!')
flag,msg=user_interface.register_interface(username,passwoed)
if flag:
print(msg)
break
else:
print(msg)
def login():
while True:
username = input('请输入用户名称:').strip()
passwoed = input('请输入用户密码:').strip()
flag, msg = user_interface.log_interface(username, passwoed)
if flag:
print(msg)
global Login_user
Login_user=username
break
else:
print(msg)
@common.auth
def check_balance():
amount=user_interface.check_balance_interface(Login_user)
print(f'用户:{Login_user}目前账户余额为{amount}')
@common.auth
def withdraw():
while True:
out_money=input('请输入提现金额:').strip()
if not out_money.isdigit():
print('请重新输入:')
continue
else:
flag,msg=bank_interface.withdraw_interface(Login_user,out_money)
if flag:
print(msg)
break
else:
print(msg)
@common.auth
def repayment():
while True:
in_money=input('请输入还款金额:').strip()
if not in_money.isdigit():
print('请重新输入:')
continue
in_money=float(in_money)
if in_money > 0:
flag,msg=bank_interface.repayment_interface(Login_user,in_money)
if flag:
print(msg)
break
@common.auth
def transfer():
while True:
to_username = input('请输入用户名称:').strip()
in_money = input('请输入还款金额:').strip()
if not in_money.isdigit():
print('请重新输入:')
continue
in_money=float(in_money)
if in_money > 0:
flag,msg=bank_interface.repayment_interface(Login_user,to_username,in_money)
if flag:
print(msg)
break
else:
print(msg)
@common.auth
def check_flow():
while True:
flag,msg=bank_interface.check_flow_interface(Login_user)
if flag:
print(msg)
break
else:
print(msg)
@common.auth
def shopping():
while True:
shop_list = [
['凤爪', 30],
['三友凤爪', 250],
['泡椒凤爪', 28],
['山东凤爪', 15],
['陕西凤爪', 100000],
['好吃的凤爪', 1]
]
car={}
for index,shop in enumerate(shop_list):
shop_name,shop_price=shop
print(f'编号:{index}',f'商品:{shop_name}'f'单价:{shop_price}')
choice=input('请输入商品编号(是否结账输入y or n):').strip()
if choice == 'y':
if not car:
print('当前购物车无商品!')
continue
flag, msg = shop_interface.shopping_interface(Login_user,car)
if flag:
print(msg)
break
else:
print(msg)
elif choice == 'n':
if not car:
print('当前无购物车!')
continue
flag, msg =shop_interface.add_car(Login_user,car)
if flag:
print(msg)
break
if not choice.isdigit():
print('命令不存在,请重新输入!')
continue
choice = int(choice)
if choice not in range(len(shop_list)):
print('命令不存在,请重新输入!')
continue
shop_name, shop_price=shop_list[choice]
if shop_name in car:
car[shop_name][1]+=1
else:
car[shop_name]=[shop_price,1]
print(f'当前购物车:{car}')
@common.auth
def check_shopping_car():
shop_car=shop_interface.check_shopping_car_interface(Login_user)
print(shop_car)
@common.auth
def change_shopping_car():
while True:
flag, msg=shop_interface.change_shopping_car_interface(Login_user)
if flag:
print(msg)
break
else:
print(msg)
@common.auth
def admin():
from core import admin
admin.admin_run()
func_dic={
'1': register,
'2': login,
'3': check_balance,
'4': withdraw,
'5': repayment,
'6': transfer,
'7': check_flow,
'8': shopping,
'9': check_shopping_car,
'10':change_shopping_car,
'11': admin
}
def run():
while True:
print('''
===== 欢迎来到本网站 =====
1、注册功能
2、登录功能
3、查看余额
4、提现功能
5、还款功能
6、转账功能
7、查看流水
8、购物功能
9、查看购物车
10、修改购物车
11、管理员功能
========= end =========
''')
choice=input('请输入命令编号(纯数字):').strip()
if choice not in func_dic:
print('请正确输入命令编号!')
continue
else:
func_dic.get(choice)()
-admin.py
from core import src
from interface import admin_interface
def add_user():
src.register()
def change_balance():
while True:
change_username=input('请输入需要修改的用户名称:').strip()
amount=input('请输入修改后的额度:').strip()
r_amount=input('请确认修改后的额度:').strip()
if amount != r_amount:
print('请重新输入:')
continue
else:
if not amount.isdigit():
continue
flag, msg =admin_interface.change_balance_intwerface(change_username,amount)
if flag:
print(msg)
break
else:
print(msg)
def lock_user():
while True:
lock_username = input('请输入需要冻结的用户名称:').strip()
flag, msg = admin_interface.lock_user_interface(lock_username)
if flag:
print(msg)
break
else:
print(msg)
admin_dic = {
'1': add_user,
'2': change_balance,
'3': lock_user,
}
def admin_run():
while True:
print('''
1、添加账户
2、修改额度
3、冻结账户
''')
choice = input('请输入管理员功能编号: ').strip()
if choice not in admin_dic:
print('请输入正确的功能编号!')
continue
admin_dic.get(choice)()
- interface(逻辑接口层) 存放核心业务逻辑代码
- user_interface.py 用户相关的接口
from db import db_handler
from lib import common
user_logger=common.logger(log_type='user')
def register_interface(username,password,balance=15000):
user_dic=db_handler.select(username)
if user_dic:
return False,'用户名存在!'
password=common.pwd5(password)
user_dic = {
'username': username,
'password': password,
'balance': balance,
'flow': [],
'shop_car': {},
'locked': False
}
db_handler.save(user_dic)
msg=f'{username}注册成功!'
user_logger.info(msg)
return True,msg
def log_interface(username,password):
user_dic = db_handler.select(username)
if user_dic:
if user_dic('locked'):
return False,'当前用户已被锁定'
password=common.pwd5(password)
if password == user_dic.get('password'):
msg=f'用户:{user_dic}登陆成功!'
user_logger.info(msg)
return True,msg
else:
msg='用户名不存在,请重新输入!'
user_logger.warn(msg)
return False,msg
msg=f'用户名{username}不存在,请重新输入'
return msg
def check_balance_interface(username):
user_dic = db_handler.select(username)
return user_dic['balance']
- bank_interface.py 银行相关的接口
from db import db_handler
from lib import common
bank_logger=common.logger(log_type='bank')
def withdraw_interface(username,out_count):
user_dic = db_handler.select(username)
balance=float(user_dic.get('balance'))
money=float(out_count) * 1.05
if balance >= money:
balance -= money
flow=f'用户{username}成功提现{out_count}手续费为{money-float(out_count)}'
user_dic.get('flow').append(flow)
user_dic[balance]=balance
db_handler.save(user_dic)
bank_logger.info(flow)
return True,flow
return False,'当前余额不足!'
def repayment_interface(username,in_money):
user_dic = db_handler.select(username)
user_dic['balance'] += in_money
flow = f'用户{username}充值{in_money}成功!'
user_dic.get('flow').append(flow)
db_handler.save(user_dic)
bank_logger.info(flow)
return True,flow
def transfer_interface(username,to_username,in_money):
user_dic1 = db_handler.select(username)
user_dic2 = db_handler.select(to_username)
if not user_dic2:
return False,'对方账户不存在!'
if user_dic1['balance'] >= in_money:
user_dic1['balance'] -= in_money
user_dic2['balance'] += in_money
flow1 = '用户{username}给用户{to_username}转账{in_money}成功!'
user_dic1.get('flow').append(flow1)
db_handler.save(user_dic1)
flow2 = '用户{to_username}收到汇款{in_money}!'
user_dic2.get('flow').append(flow2)
db_handler.save(user_dic2)
bank_logger.info(flow1)
return True,flow1
return False,'余额不足!'
def check_flow_interface(username):
flow_list = db_handler.select(username).get('flow')
if flow_list:
return True,flow_list
else:
msg=f'{username}当前没有流水!'
bank_logger.info(msg)
return False,msg
def pay_interface(username, cost):
user_dic = db_handler.select(username)
if user_dic.get('balance') >= cost:
user_dic['balance'] -= cost
# 清空购物车,清空购物车字典
user_dic['shop_car'].clear()
db_handler.save(user_dic)
return True
return False
- shop_interface.py 购物相关的接口
from db import db_handler
from lib import common
shop_logger=common.logger(log_type='shop')
def shopping_interface(username,car):
cost=0
for shop in car.values():
price,count=shop
cost += (price*count)
from interface import bank_interface
flag=bank_interface.pay_interface(username,cost)
if flag:
msg = '支付成功,商家正在准备!'
shop_logger.info(msg)
return True,
return False,'当前账户余额不足!'
def add_car(username,car):
user_dic = db_handler.select(username)
shop_car=user_dic.get('shop_car')
for shop_name,data in car.items():
number=data[1]
if shop_name in shop_car:
user_dic['shop_car'][shop_name][1] += number
else:
user_dic['shop_car'].updata(shop_name,data)
db_handler.save(user_dic)
msg = '购物车添加成功!'
shop_logger.info(msg)
return True,msg
def check_shopping_car_interface(username):
user_dic = db_handler.select(username)
return user_dic.get('shop_car')
def change_shopping_car_interface(username):
user_dic = db_handler.select(username)
shop_car = user_dic.get('shop_car')
if shop_car:
for shop_name,data in shop_car.items():
number=data[1]
shopname=input('请输入需要修改个数的商品名称:').strip()
count=input('请输入需要修改后的个数:').strip()
if not count.isdigit():
print('请重新输入:')
continue
user_dic['shop_car'][shopname][1]=count
db_handler.save(user_dic)
msg = '购物车修改成功!'
shop_logger.info(msg)
return True, msg
return False,'当前购物车为空!'
- admin_interface.py 管理员相关的接口
from db import db_handler
from lib import common
admin_logger=common.logger(log_type='admin')
def change_balance_intwerface(username,money):
user_dic = db_handler.select(username)
if user_dic:
user_dic['balance']=float(money)
msg=f'用户{username}额度修改成功!'
admin_logger.info(msg)
return True,msg
return False,f'用户{username}不存在!'
def lock_user_interface(username):
user_dic = db_handler.select(username)
if user_dic:
user_dic['locked'] = True
db_handler.save(user_dic)
msg = f'用户{username}冻结成功!'
admin_logger.info(msg)
return True, msg
return False, f'冻结用户{username}不存在!'
- db(数据处理层) 存放数据与数据处理层代码
- db_handler.py 数据处理层代码
import json
import os
from conf import settings
def select(username):
user_path = os.path.join(settings.USER_DATA_PATH, f'{username}.json')
if os.path.exists(user_path):
with open(user_path, mode='rt', encoding='utf-8')as f:
user_dict=json.load(f)
return user_dict
def save(user_dic):
username=user_dic.get('username')
user_path = os.path.join(
settings.USER_DATA_PATH, f'{username}.json'
)
with open(user_path, 'w', encoding='utf-8') as f:
json.dump(user_dic, f, ensure_ascii=False)
- user_data 用户数据
- log 存放日志文件