标签:添加 interface http ret 业务逻辑 效果 ase flow opp
# 目录文件结构
- ATM
- conf
- settings.py
- core
- src.py
- db
- db_handler.py
- interface
- admin_interface.py
- bank_interface.py
- shop_interface.py
- user_interface.py
- lib
- common.py
- log
start.py
购物车项目拆分为三部分,前端层(面向用户)接口层 业务处理层
前端必须通过接口层才能调用到业务处理层
购物车功能具备:
1. 注册 2. 登录 3. 查看余额 4. 提现 5. 还款 6. 转账 7. 查看流水 8. 购物车功能 9. 查看购物车 10. 注销 11. 管理员功能
import os
import sys
from core import src
BASE_PATH = os.path.dirname(__file__)
sys.path.append(BASE_PATH)
if __name__ == ‘__main__‘:
src.run()
import os
BASE_PATH = os.path.dirname(os.path.dirname(__file__))
DB_PATH = os.path.join(BASE_PATH, ‘db‘)
"""
logging配置
"""
# 定义三种日志输出格式 开始
standard_format = ‘[%(asctime)s][%(threadName)s:%(thread)d][task_id:%(name)s][%(filename)s:%(lineno)d]‘ ‘[%(levelname)s][%(message)s]‘ # 其中name为getlogger指定的名字
simple_format = ‘[%(levelname)s][%(asctime)s][%(filename)s:%(lineno)d]%(message)s‘
id_simple_format = ‘[%(levelname)s][%(asctime)s] %(message)s‘
# 定义日志输出格式 结束
logfile_dir = os.path.join(BASE_PATH, ‘log‘) # log文件的目录
logfile_name = ‘atm_shop_log.log‘ # log文件名
# 如果不存在定义的日志目录就创建一个
if not os.path.isdir(logfile_dir):
os.mkdir(logfile_dir)
# log文件的全路径
logfile_path = os.path.join(logfile_dir, logfile_name)
# log配置字典
LOGGING_DIC = {
‘version‘: 1,
‘disable_existing_loggers‘: False,
‘formatters‘: {
‘standard‘: {
‘format‘: standard_format
},
‘simple‘: {
‘format‘: simple_format
},
},
‘filters‘: {},
‘handlers‘: {
# 打印到终端的日志
‘console‘: {
‘level‘: ‘DEBUG‘,
‘class‘: ‘logging.StreamHandler‘, # 打印到屏幕
‘formatter‘: ‘simple‘
},
# 打印到文件的日志,收集info及以上的日志
‘default‘: {
‘level‘: ‘DEBUG‘,
‘class‘: ‘logging.handlers.RotatingFileHandler‘, # 保存到文件
‘formatter‘: ‘standard‘,
‘filename‘: logfile_path, # 日志文件
‘maxBytes‘: 1024 * 1024 * 5, # 日志大小5M
‘backupCount‘: 5,
‘encoding‘: ‘utf-8‘, # 日志文件的编码,再也不用担心中文log乱码了
},
},
‘loggers‘: {
# logging.getLogger(__name__)拿到的logger配置
‘‘: {
‘handlers‘: [‘default‘, ‘console‘], # 这里把上面定义的两个handler都加上,即log数据既写入文件又打印到屏幕
‘level‘: ‘DEBUG‘,
‘propagate‘: True, # 向上(更高level的logger)传递
},
},
}
from interface import user_interface
from lib import common
from interface import bank_interface
from interface import shop_interface
from interface import admin_interface
user_info = {
‘user‘: None,
}
# 注册
def register():
print(‘注册功能‘)
while True:
user = input(‘请输入用户名:‘).strip()
# 调用接口判断用户是否存在
flag = user_interface.check_user_interface(user)
if flag:
print(‘用户已存在,请重新输入!‘)
continue
pwd = input(‘请输入密码: ‘).strip()
re_pwd = input(‘请确认密码:‘).strip()
if pwd == re_pwd:
# 业务逻辑
# 调用接口层注册接口保存用户信息
# user_dic = {
# ‘user‘: user,
# ‘pwd‘: pwd,
# ‘balance‘: 15000
# }
msg = user_interface.register_interface(user,pwd)
if msg:
print(msg)
break
else:
print(‘注册失败‘)
else:
print(‘密码不一致‘)
# 数据处理
# import os
# import json
# base_path = os.path.dirname(os.path.dirname(__file__))
# db_path = os.path.join(base_path,‘db‘)
# with open(f‘{db_path}/{user}.json‘,‘w‘,encoding=‘utf-8‘) as f:
# json.dump(user_dic,f)
# f.flush()
# print(f‘{user}功能注册成功‘)
# break
# 登录
def login():
while True:
user = input(‘请输入用户名:‘).strip()
flag = user_interface.check_user_interface(user)
if not flag:
print(‘用户不存在,请重新输入‘)
continue
pwd = input(‘请输入密码:‘).strip()
# (True,f‘{user}登录成功!‘) = user_interface.login_interface(user,pwd)
# 调用登录接口
flag,msg = user_interface.login_interface(user,pwd)
if flag:
print(msg)
# 登录成功后做一个记录
user_info[‘user‘] = user
break
else:
print(msg)
# 查看余额
@common.login_auth
def check_bal():
print(‘查看余额...‘)
bal = user_interface.check_bal_interface(user_info[‘user‘])
print(bal)
@common.login_auth
# 提现
def whthdraw():
while True:
money = input(‘请输入需要体现金额:‘).strip()
if not money.isdigit():
print(‘请输入数字‘)
continue
money = int(money)
flag,msg = bank_interface.withdraw_interface(
user_info[‘user‘],money)
if flag:
print(msg)
break
else:
print(msg)
@common.login_auth
# 还款
def repay():
while True:
money = input(‘请输入还款金额‘).strip()
if not money.isdigit():
print(‘请输入数字‘)
continue
money = int(money)
flag,msg = bank_interface.repay_interface(user_info[‘user‘],money)
if flag:
print(msg)
break
@common.login_auth
# 转账
def transfer():
while True:
to_user = input(‘请输入转账目标用户‘).strip()
flag = user_interface.check_user_interface(to_user)
if not flag:
print(‘目标用户不存在‘)
continue
# 输入转账金额
money = input(‘请输入转账金额‘).strip()
if not money.isdigit():
print(‘请输入数字!‘)
continue
money = int(money)
flag,msg = bank_interface.transfer_interface(to_user,user_info[‘user‘],money)
if flag:
print(msg)
else:
print(msg)
@common.login_auth
# 查看流水
def check_flow():
flow_list = bank_interface.check_flow_interface(user_info[‘user‘])
for flow in flow_list:
print(flow)
@common.login_auth
# 购物车功能
def shopping():
good_list = [
[‘j哥牌辣椒酱‘,5],
[‘egon牌公仔‘,10],
[‘nick牌t-shirt‘,100],
[‘围城‘,39],
[‘tank牌坦克‘,50000],
]
# 调用查看余额接口
user_bal = user_interface.check_bal_interface(user_info[‘user‘])
shop_cart = {}
cose = 0
while True:
# 打印商品信息
for index,goot_price in enumerate(good_list):
print(index,goot_price)
# 选择商品编号或者输入q退出
choice = input(‘请输入商品编号或者输入q退出‘).strip()
if choice.isdigit():
choice = int(choice)
if choice >= 0 and choice < len(good_list):
goot_name,good_price = good_list[choice]
# 判断用户余额是否高于商品价格,否则无法购买
if user_bal >= good_price:
# 判断购物车当中是否有选择的商品,如果没有,将选择的商品加入购物车,如果有,将商品的数量加1
if goot_name in shop_cart:
shop_cart[goot_name] += 1
else:
shop_cart[goot_name] = 1
cose += good_price
else:
print(‘余额不足,不足以购买商品‘)
else:
print(‘请输入正确商品编号‘)
elif choice == ‘q‘:
commit = input(‘是否确认结账,请输入y/n:‘).strip()
if commit == ‘y‘:
# 调用购物商城支付功能,并调用银行支付接口
flag,msg = shop_interface.buy_shop_interface(user_info[‘user‘],cose)
if flag:
print(msg)
break
else:
print(msg)
elif commit == ‘n‘:
# 添加购物车接口调用
shop_interface.add_shop_cart_interface(user_info[‘user‘],shop_cart)
break
@common.login_auth
# 查看购物车
def check_shop_cart():
shop_cart = shop_interface.check_shop_cart_interface(user_info[‘user‘])
if not shop_cart:
print(‘购物车是空的,请选择购物功能‘)
print(shop_cart)
@common.login_auth
# 注销
def logout():
flag,msg = user_interface.logout_interface()
if flag:
print(msg)
# 冻结用户功能
def lock_user():
print(‘冻结用户....‘)
user = input(‘请输入需要冻结的用户:‘).strip()
flag = user_interface.check_user_interface(user)
if flag:
# 调用冻结接口
msg = admin_interface.lock_interface(user)
print(‘msg‘)
else:
print(‘用户不存在‘)
# 解冻用户
def unlock_user():
print(‘解冻用户....‘)
user = input(‘请输入需要解冻的用户:‘).strip()
flag = user_interface.check_user_interface(user)
if flag:
msg = admin_interface.unlock_interface(user)
print(msg)
else:
print(‘用户不存在!‘)
# 修改用户额度
def change_limit():
print(‘修改用户额度‘)
user = input(‘请输入需要修改额度的用户:‘).strip()
flag = user_interface.check_user_interface(user)
if not flag:
print(‘用户不存在‘)
return
limit = input(‘请输入修改额度:‘).strip()
if limit.isdigit():
limit = int(limit)
# 调用修改额度接口
msg = admin_interface.change_limit_interface(user,limit)
print(msg)
else:
print(‘请输入数字‘)
admin_func_dic = {
‘1‘: lock_user,
‘2‘: unlock_user,
‘3‘: change_limit
}
@common.login_auth
# 管理员
def admin_sys():
while True:
print(‘‘‘
1.冻结用户
2.解冻用户
3.用户额度
‘‘‘)
choice = input(‘请选择管理员功能‘).strip()
if choice not in admin_func_dic:
print(‘请重新输入‘)
continue
admin_func_dic[choice]()
# 字典容器
fun_dic = {
‘1‘: register,
‘2‘: login,
‘3‘: check_bal,
‘4‘: whthdraw,
‘5‘: repay,
‘6‘: transfer,
‘7‘: check_flow,
‘8‘: shopping,
‘9‘: check_shop_cart,
‘10‘: logout,
‘11‘: admin_sys
}
def run():
while True:
print(‘‘‘
1. 注册
2. 登录
3. 查看余额
4. 提现
5. 还款
6. 转账
7. 查看流水
8. 购物车功能
9. 查看购物车
10. 注销
11. 管理员功能
‘‘‘)
choice = input(‘请选择功能编号>>>‘).strip()
if choice == ‘q‘:
break
if choice in fun_dic:
fun_dic[choice]()
else:
print(‘请输入正确编号‘)
import os
import json
from conf import settings
# 保存数据
def save(user_dic):
user_path = f‘{settings.DB_PATH}/{user_dic["user"]}.json‘
with open(user_path,‘w‘, encoding=‘utf-8‘) as f:
json.dump(user_dic, f)
f.flush()
# 查看用户数据
def select(user):
# base_path = os.path.dirname(os.path.dirname(__file__))
# db_path = os.path.join(base_path, ‘db‘)
user_path = f‘{settings.DB_PATH}/{user}.json‘
if os.path.exists(user_path):
with open(user_path,‘r‘,encoding=‘utf-8‘) as f:
user_dic = json.load(f)
return user_dic
from db import db_handler
# 冻结用户接口
def lock_interface(user):
user_dic = db_handler.select(user)
user_dic[‘lock‘] = True
db_handler.save(user_dic)
return f‘{user}用户冻结成功‘
def unlock_interface(user):
user_dic = db_handler.select(user)
user_dic[‘lock‘] = False
db_handler.save(user_dic)
return f‘{user}用户解冻成功‘
# 修改额度接口
def change_limit_interface(user,limit):
user_dic = db_handler.select(user)
user_dic[‘balance‘] = limit
db_handler.save(user_dic)
return f‘修改用户{user}额度成功‘
from db import db_handler
# 提现接口
def withdraw_interface(user,money):
user_dic = db_handler.select(user)
if user_dic[‘balance‘] >= money * 1.05:
user_dic[‘balance‘] -= money * 1.05
msg = f‘用户[{user}] 提现[{money}]元成功!‘
user_dic[‘flow‘].append(msg)
db_handler.save(user_dic)
return True,msg
return False,‘余额不足!‘
# 还款接口
def repay_interface(user,money):
user_dic = db_handler.select(user)
user_dic[‘balance‘] += money
msg = f‘用户{user},还款{money}元成功!‘
user_dic[‘flow‘].append(msg)
db_handler.save(user_dic)
return True,msg
# 转账接口
def transfer_interface(to_user,from_user,money):
to_user_dic = db_handler.select(to_user)
from_user_dic = db_handler.select(from_user)
if from_user_dic[‘balance‘] >= money:
# 加减钱操作
from_user_dic[‘balance‘] -= money
to_user_dic[‘balance‘] += money
# 拼接流水
to_user_flow = f‘{to_user}用户接收到用户{from_user}转账{money}元‘
from_user_flow = f‘{from_user}用户给到用户{to_user}转账{money}元‘
# 记录流水
from_user_dic[‘flow‘].append(from_user_flow)
to_user_dic[‘flow‘].append(to_user_flow)
# 保存用户信息
db_handler.save(from_user_dic)
db_handler.save(to_user_dic)
return True,from_user_flow
return False,‘余额不足,请充值!‘
# 查看流水接口
def check_flow_interface(user):
user_dic = db_handler.select(user)
return user_dic[‘flow‘]
# 银行支付接口
def pay_interface(user,cost):
user_dic = db_handler.select(user)
if user_dic[‘balance‘] >= cost:
user_dic[‘balance‘] -= cost
user_dic[‘flow‘].append(f‘{user}用户支付{cost}成功‘)
db_handler.save(user_dic)
return True
else:
return False
from interface import bank_interface
from db import db_handler
# 商城结账接口
def buy_shop_interface(user,cost):
flag = bank_interface.pay_interface(user,cost)
if flag:
return True,‘购物成功!‘
else:
return False,‘余额不足,支付失败!‘
# 添加购物车接口
def add_shop_cart_interface(user,shop_cart):
user_dic = db_handler.select(user)
old_cart = user_dic[‘shop_cart‘]
# 循环当前购物车
for shop in shop_cart:
if shop in old_cart:
# 获取当前购物车中的商品数量
number = shop_cart[shop]
# 给用户信息中的商品数量做增值运算
old_cart[shop] += number
else:
number = shop_cart[shop]
old_cart[shop] = number
user_dic[‘shop_cart‘].update(old_cart)
db_handler.save(user_dic)
return True,‘添加购物车成功‘
# 查看购物车接口
def check_shop_cart_interface(user):
user_dic = db_handler.select(user)
return user_dic[‘shop_cart‘]
from db import db_handler
from lib import common
from lib import common
user_logger = common.get_logger(‘user‘)
# 查看用户接口
def check_user_interface(user):
# 调用数据处理层的select函数,查看用户信息
user_dic = db_handler.select(user)
if user_dic:
return True
import hashlib
# 注册接口
def register_interface(user,pwd,balance=15000):
pwd = common.get_md5(pwd)
user_dic = {
‘user‘: user,
‘pwd‘: pwd,
‘balance‘: 15000,
‘flow‘: [],
‘shop_cart‘: {},
‘lock‘: False
}
db_handler.save(user_dic)
user_logger.info(f‘{user_dic["user"]}功能注册成功!‘)
return f‘{user_dic["user"]}功能注册成功!‘
# 登录接口
def login_interface(user,pwd):
user_dic = db_handler.select(user)
pwd = common.get_md5(pwd)
if user_dic[‘lock‘]:
user_logger.info(f‘用户[{user}]已被冻结,请联系管理员!‘)
return False,‘用户已被冻结,请联系管理员!‘
if user_dic[‘pwd‘] == pwd:
user_logger.info(f‘{user}登录成功!‘)
return True,f‘{user}登录成功!‘
user_logger.info(f‘{user}输入密码错误‘)
return False,‘{user}密码错误‘
# 查看余额接口
def check_bal_interface(user):
user_dic = db_handler.select(user)
return user_dic[‘balance‘]
# 注销接口
def logout_interface():
from core import sre
user = src.user_info[‘user‘]
src.user_info[‘user‘] = None
return True,f‘{user}用户注销成功!‘
import hashlib
import logging.config
import os
import sys
comm_PATH = os.path.dirname(os.path.dirname(__file__))
sys.path.append(comm_PATH)
from conf import settings
# md5加密功能
def get_md5(pwd):
val = ‘天王盖地虎‘
md5 = hashlib.md5()
md5.update(pwd.encode(‘utf-8‘))
md5.update(val.encode(‘utf-8‘))
res = md5.hexdigest()
return res
# 登录认证功能
def login_auth(func):
from core import src
def inner(*args,**kwargs):
# 判断用户存在则执行被装饰函数
if src.user_info[‘user‘]:
res = func(*args,**kwargs)
return res
# 否则执行登录功能
else:
print(‘请先登录‘)
src.login()
return inner
# 日志功能
def get_logger(type_name):
logging.config.dictConfig(settings.LOGGING_DIC)
logger = logging.getLogger(type_name)
return logger
# 测试
# if __name__ == ‘__main__‘:
# logger = get_logger(‘tank‘)
# logger.info(‘jason is dsb‘)
目录创建即可,里面的日志文件会自动产生
这里不做过多截图展示啦~ 大家想试试直接复制代码即可哦~
标签:添加 interface http ret 业务逻辑 效果 ase flow opp
原文地址:https://www.cnblogs.com/tcy1/p/14537158.html