config.py配置文件
# 配置数据库信息
db_config = {
'host': 'localhost',
'port': '3306',
'database': 'bdpoi',
'username': 'bdpoi',
'password': 'rKADFaJxAG2b7Lfd'
}
操作语句
import json
from config import *
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import text
app = Flask(__name__)
app.config['SECRET_KEY'] = 'xufeJjtC9Lsuv8m7D7ZZfph3a6MjpMpJ'
# url的格式为,数据库的协议://用户名:密码@ip地址:端口号(默认可以不写)/数据库名
app.config["SQLALCHEMY_DATABASE_URI"] = 'mysql://{username}:{password}@{host}:{port}/{database}'.format(**db_config)
# 动态追踪数据库的修改
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = True
app.config['SQLALCHEMY_ECHO'] = True
# 创建数据库的操作对象
db = SQLAlchemy(app)
'''
db.Column属性
primary_key 如果设为 True,这列就是表的主键
unique 如果设为 True,这列不允许出现重复的值
index 如果设为 True,为这列创建索引,提升查询效率
nullable 如果设为 True,这列允许使用空值;如果设为 False,这列不允许使用空值
default 为这列定义默认值
'''
class Poi(db.Model):
__tablename__ = "po_poi"
poi_id = db.Column(db.Integer, primary_key=True, autoincrement=True)
poi_name = db.Column(db.String(255), nullable=True)
poi_category_id = db.Column(db.Integer, nullable=True)
# repr()方法显示一个可读字符串
def __repr__(self):
return 'Poi:%s' % self.poi_name
# 默认首页
@app.route("/")
def get_index():
return "py-LockDataV API"
# 创建数据表
@app.route("/cre")
def create_index():
db.create_all()
return "创建数据表,状态:OK"
# 新增数据
@app.route("/add")
def add_index():
p1 = Poi(poi_name='宁波商会', poi_category_id="2")
p2 = Poi(poi_name='大海大厦', poi_category_id="1")
p3 = Poi(poi_name='国骅大厦', poi_category_id="2")
db.session.add_all([p1, p2, p3])
db.session.commit()
# 如何防止重复提交
return "新增数据,状态:OK"
# 删除数据
@app.route("/del")
@app.route("/del/")
@app.route("/del/<poi_id>")
def del_index(poi_id=None):
if poi_id:
# get_or_404 如果id不存在,抛出404页面
# del_id = Poi.query.get_or_404(poi_id)
# get方式
del_id = Poi.query.get(poi_id)
if del_id:
db.session.delete(del_id)
db.session.commit()
else:
return "Id不存在数据,状态:Fail"
return "删除数据,状态:OK"
else:
return "poi_id为空,越权操作"
# 查询数据
@app.route("/get")
def get_data():
# in查询
# Poi.query.filter(Poi.poi_name.in_('A', 'B', 'C', 'D'))
# 限制查询
# Poi.query.filter(poi_name=18).offset(2).limit(3) # 跳过二条开始查询,限制输出3条
# 排序
# results = Poi.query.order_by('poi_id') # 按默认升序,在前面加-号为降序'-age'
# results = Poi.query.order_by(text("-poi_id"))
results = Poi.query.filter_by(poi_category_id=2).all()
# 定义字典和序列
rows = []
data = {}
for rs in results:
row = {"poi_id": rs.poi_id, "poi_name": rs.poi_name, "poi_category_id": rs.poi_category_id}
rows.append(row)
data['code'] = 0
data['msg'] = 'OK'
data['data'] = rows
# 输出标准的JSON字符串
json_str = json.dumps(data, ensure_ascii=False)
return json_str
# 获取数据库记录API接口
@app.route("/poi")
@app.route("/poi/")
@app.route("/poi/<pid>")
def get_poi(pid=None):
# total = Poi.query.count()
# print(total)
if pid is None:
results = Poi.query.all()
else:
results = Poi.query.filter_by(poi_id=pid).all()
# 定义字典和序列
rows = []
data = {}
for rs in results:
row = {"poi_id": rs.poi_id, "poi_name": rs.poi_name, "poi_category_id": rs.poi_category_id}
rows.append(row)
data['code'] = 0
data['msg'] = 'OK'
data['data'] = rows
# 输出标准的JSON字符串
json_str = json.dumps(data, ensure_ascii=False)
return json_str
if __name__ == '__main__':
app.run(debug=True)
lockdatav Done!