ATM + 购物车
conf存放配置文件
import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
# 拼接db目录路径
DB_PATH = os.path.join(BASE_PATH, 'db')
"""
logging配置
"""
# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
# 定义日志输出格式 结束
logfile_dir = os.path.join(BASE_PATH, 'log') # log文件的目录
logfile_name = 'atm_shop_log.log' # log文件名
# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir)
# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)
# log配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard',
'filename': logfile_path, # 日志文件
'maxBytes': 1024 * 1024 * 5, # 日志大小5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': True, # 向上(更高level的logger)传递
},
},
}
core核心业务
import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
# 拼接db目录路径
DB_PATH = os.path.join(BASE_PATH, 'db')
"""
logging配置
"""
# 定义三种日志输出格式 开始
standard_format = '[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]' \
'[%(levelname)s][%(message)s]' # 其中name为getlogger指定的名字
simple_format = '[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s'
id_simple_format = '[%(levelname)s][%(asctime)s] %(message)s'
# 定义日志输出格式 结束
logfile_dir = os.path.join(BASE_PATH, 'log') # log文件的目录
logfile_name = 'atm_shop_log.log' # log文件名
# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir)
# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)
# log配置字典
LOGGING_DIC = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': standard_format
},
'simple': {
'format': simple_format
},
},
'filters': {},
'handlers': {
# 打印到终端的日志
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler', # 打印到屏幕
'formatter': 'simple'
},
# 打印到文件的日志,收集info及以上的日志
'default': {
'level': 'DEBUG',
'class': 'logging.handlers.RotatingFileHandler', # 保存到文件
'formatter': 'standard',
'filename': logfile_path, # 日志文件
'maxBytes': 1024 * 1024 * 5, # 日志大小5M
'backupCount': 5,
'encoding': 'utf-8', # 日志文件的编码,再也不用担心中文log乱码了
},
},
'loggers': {
# logging.getLogger(__name__)拿到的logger配置
'': {
'handlers': ['default', 'console'], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
'level': 'DEBUG',
'propagate': True, # 向上(更高level的logger)传递
},
},
}
db数据处理
import os
import json
from conf import settings
# 保存数据
def save(user_dic):
# 拼接用户保存文件
user_path = f'{settings.DB_PATH}/{user_dic["user"]}.json'
# 把用户信息写入文件中
with open(user_path, 'w', encoding='utf-8') as f:
json.dump(user_dic, f, ensure_ascii=False)
f.flush()
# 查看用户数据
def select(user):
# base_path = os.path.dirname(os.path.dirname(__file__))
# db_path = os.path.join(base_path, 'db')
user_path = f'{settings.DB_PATH}/{user}.json'
# 判断用户文件是否存在
if os.path.exists(user_path):
# 若存在, 读取用户信息
with open(user_path, 'r', encoding='utf-8') as f:
user_dic = json.load(f)
return user_dic
interface接口逻辑处理
import os
import json
from conf import settings
# 保存数据
def save(user_dic):
# 拼接用户保存文件
user_path = f'{settings.DB_PATH}/{user_dic["user"]}.json'
# 把用户信息写入文件中
with open(user_path, 'w', encoding='utf-8') as f:
json.dump(user_dic, f, ensure_ascii=False)
f.flush()
# 查看用户数据
def select(user):
# base_path = os.path.dirname(os.path.dirname(__file__))
# db_path = os.path.join(base_path, 'db')
user_path = f'{settings.DB_PATH}/{user}.json'
# 判断用户文件是否存在
if os.path.exists(user_path):
# 若存在, 读取用户信息
with open(user_path, 'r', encoding='utf-8') as f:
user_dic = json.load(f)
return user_dic
管理员接口
from db import db_handler
from lib import common
admin_logger = common.get_logger('admin')
# 冻结用户接口
def lock_interface(user):
user_dic = db_handler.select(user)
user_dic['lock'] = True
db_handler.save(user_dic)
return f'{user}用户冻结成功!'
def unlock_interface(user):
user_dic = db_handler.select(user)
user_dic['lock'] = False
db_handler.save(user_dic)
return f'{user}用户解冻成功!'
# 修改额度接口
def change_limit_interface(user, limit):
user_dic = db_handler.select(user)
user_dic['balance'] = limit
db_handler.save(user_dic)
return f'修改用户[{user}]额度成功!'
银行接口
from db import db_handler
from lib import common
bank_logger = common.get_logger('bank')
# 提现接口
def withdraw_interface(user, money):
user_dic = db_handler.select(user)
if user_dic['balance'] >= money * 1.05:
user_dic['balance'] -= money * 1.05
msg = f'用户[{user}] 提现[{money}]元成功!'
user_dic['flow'].append(msg)
db_handler.save(user_dic)
return True, msg
return False, '余额不足!'
# 还款接口
def repay_interface(user, money):
user_dic = db_handler.select(user)
user_dic['balance'] += money
msg = f'用户{user}, 还款{money}元成功!'
user_dic['flow'].append(msg)
db_handler.save(user_dic)
return True, msg
# 转账接口
def transfer_interface(to_user, from_user, money):
to_user_dic = db_handler.select(to_user)
from_user_dic = db_handler.select(from_user)
if from_user_dic['balance'] >= money:
# 加减钱操作
from_user_dic['balance'] -= money
to_user_dic['balance'] += money
# 拼接流水信息
to_user_flow = f'{to_user}用户接收到用户{from_user}转账{money}元'
from_user_flow = f'{from_user}用户给到用户{to_user}转账{money}元'
# 记录流水
from_user_dic['flow'].append(from_user_flow)
to_user_dic['flow'].append(to_user_flow)
# 保存用户信息
db_handler.save(from_user_dic)
db_handler.save(to_user_dic)
return True, from_user_flow
return False, '余额不足,请充值!'
# 查看流水接口
def check_flow_interface(user):
user_dic = db_handler.select(user)
return user_dic['flow']
# 银行支付接口
def pay_interface(user, cost):
user_dic = db_handler.select(user)
if user_dic['balance'] >= cost:
user_dic['balance'] -= cost
user_dic['flow'].append(f'{user}用户支付{cost}成功!')
db_handler.save(user_dic)
return True
else:
return False
购物车接口
from interface import bank_interface
from db import db_handler
from lib import common
shop_logger = common.get_logger('shop')
# 商城结账接口
def buy_shop_interface(user, cost):
flag = bank_interface.pay_interface(user, cost)
if flag:
return True, '购物成功!'
else:
return False, '余额不足,支付失败!'
# 添加购物车接口
def add_shop_cart_interface(user, shop_cart):
user_dic = db_handler.select(user)
old_cart = user_dic['shop_cart']
# 循环当前购物车
for shop in shop_cart:
if shop in old_cart:
# 获取当前购物车中的商品数量
number = shop_cart[shop]
# 给用户信息中的商品数量做 增值运算
old_cart[shop] += number
else:
# 获取当前购物车中的商品数量
number = shop_cart[shop]
old_cart[shop] = number
user_dic['shop_cart'].update(old_cart)
db_handler.save(user_dic)
return True, '添加购物车成功!'
# 查看购物车接口'
def check_shop_cart_interface(user):
user_dic = db_handler.select(user)
return user_dic['shop_cart']
用户接口
from db import db_handler
from lib import common
from lib import common
user_logger = common.get_logger('user')
# 查看用户接口
def check_user_interface(user):
# 调用数据处理层的select函数,查看用户信息
user_dic = db_handler.select(user)
if user_dic:
return True
# 注册接口
def register_interface(user, pwd, balance=15000):
pwd = common.get_md5(pwd)
user_dic = {
'user': user,
'pwd': pwd,
'balance': balance,
'flow': [],
'shop_cart': {},
'lock': False
}
# 调用保存数据功能
db_handler.save(user_dic)
user_logger.info(f'{user_dic["user"]}用户注册成功!')
return f'{user_dic["user"]}用户注册成功!'
# 登陆接口
def login_interface(user, pwd):
user_dic = db_handler.select(user)
pwd = common.get_md5(pwd)
if user_dic['lock']:
user_logger.info(f'用户[{user}]已被冻结,请联系管理员!')
return False, '用户已被冻结,请联系管理员!'
if user_dic['pwd'] == pwd:
user_logger.info(f'{user}登陆成功!')
return True, f'{user}登陆成功!'
user_logger.info(f'{user}输入密码错误!')
return False, f'{user}密码错误!'
# 查看余额接口
def check_bal_interface(user):
user_dic = db_handler.select(user)
return user_dic['balance']
# 注销接口
def logout_interface():
from core import src
user = src.user_info['user']
src.user_info['user'] = None
return True, f'{user}用户注销成功!欢迎官人下次再来!'
lib公共文件
import hashlib
import logging.config
from conf import settings
# MD5加密功能
def get_md5(pwd):
val = '天王盖地虎'
md5 = hashlib.md5()
md5.update(pwd.encode('utf-8'))
md5.update(val.encode('utf-8'))
res = md5.hexdigest()
return res
# 登陆认证功能
def login_auth(func):
from core import src
def inner(*args, **kwargs):
# 判断用户存在, 则执行被装饰函数
if src.user_info['user']:
res = func(*args, **kwargs)
return res
# 否则,执行登陆功能
else:
print('请先登陆')
src.login()
return inner
# 日志功能
def get_logger(type_name):
logging.config.dictConfig(settings.LOGGING_DIC)
logger = logging.getLogger(type_name)
return logger
if __name__ == '__main__':
res = get_md5('123')
print(res)
logger = get_logger('tank')
log日志
start.py启动文件
import os
import sys
BASE_PATH = os.path.dirname(__file__)
sys.path.append(BASE_PATH)
from core import src
if __name__ == '__main__':
src.run()